diff --git a/go.mod b/go.mod index b7a73fd58..c22495a4c 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 github.com/prometheus/common v0.44.0 - github.com/segmentio/kafka-go v0.4.38 + github.com/segmentio/kafka-go v0.4.43 github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 @@ -93,8 +93,9 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.4.2 // indirect - github.com/xdg/scram v1.0.5 // indirect - github.com/xdg/stringprep v1.0.3 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/crypto v0.12.0 // indirect golang.org/x/oauth2 v0.10.0 // indirect diff --git a/go.sum b/go.sum index 0aefe609c..91e945a74 100644 --- a/go.sum +++ b/go.sum @@ -767,8 +767,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= -github.com/segmentio/kafka-go v0.4.38 h1:iQdOBbUSdfuYlFpvjuALgj7N6DrdPA0HfB4AhREOdtg= -github.com/segmentio/kafka-go v0.4.38/go.mod h1:ikyuGon/60MN/vXFgykf7Zm8P5Be49gJU6vezwjnnhU= +github.com/segmentio/kafka-go v0.4.43 h1:yKVQ/i6BobbX7AWzwkhulsEn47wpLA8eO6H03bCMqYg= +github.com/segmentio/kafka-go v0.4.43/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= @@ -843,12 +843,14 @@ github.com/vladimirvivien/gexe v0.2.0/go.mod h1:LHQL00w/7gDUKIak24n801ABp8C+ni6e github.com/vmware/go-ipfix v0.8.0 h1:9jeeMppLHU6KxCz6BHtDw0YW7Von5MKyz03p9fNT5JI= github.com/vmware/go-ipfix v0.8.0/go.mod h1:Y3YKMFN/Nec6QwmXcDae+uy6xuDgbejwRAZv9RTzS9c= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= -github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw= -github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= -github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4= -github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xlab/treeprint v1.0.0/go.mod h1:IoImgRak9i3zJyuxOKUP1v4UZd1tMoKkq/Cimt1uhCg= @@ -906,7 +908,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= @@ -996,10 +997,10 @@ golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220706163947-c90051bbdb60/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1113,6 +1114,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1123,8 +1125,10 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/vendor/github.com/segmentio/kafka-go/README.md b/vendor/github.com/segmentio/kafka-go/README.md index 20f20e682..304c1603b 100644 --- a/vendor/github.com/segmentio/kafka-go/README.md +++ b/vendor/github.com/segmentio/kafka-go/README.md @@ -225,7 +225,6 @@ r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"}, Topic: "topic-A", Partition: 0, - MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) r.SetOffset(42) @@ -256,7 +255,6 @@ r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", - MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) @@ -320,7 +318,6 @@ r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", - MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB CommitInterval: time.Second, // flushes commits to Kafka every second }) @@ -412,6 +409,7 @@ for i := 0; i < retries; i++ { if err != nil { log.Fatalf("unexpected error %v", err) } + break } if err := w.Close(); err != nil { @@ -718,7 +716,6 @@ r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "my-topic1", Partition: 0, - MinBytes: batchSize, MaxBytes: batchSize, }) diff --git a/vendor/github.com/segmentio/kafka-go/alterclientquotas.go b/vendor/github.com/segmentio/kafka-go/alterclientquotas.go new file mode 100644 index 000000000..7a926e5c4 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/alterclientquotas.go @@ -0,0 +1,131 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/alterclientquotas" +) + +// AlterClientQuotasRequest represents a request sent to a kafka broker to +// alter client quotas. +type AlterClientQuotasRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // List of client quotas entries to alter. + Entries []AlterClientQuotaEntry + + // Whether the alteration should be validated, but not performed. + ValidateOnly bool +} + +type AlterClientQuotaEntry struct { + // The quota entities to alter. + Entities []AlterClientQuotaEntity + + // An individual quota configuration entry to alter. + Ops []AlterClientQuotaOps +} + +type AlterClientQuotaEntity struct { + // The quota entity type. + EntityType string + + // The name of the quota entity, or null if the default. + EntityName string +} + +type AlterClientQuotaOps struct { + // The quota configuration key. + Key string + + // The quota configuration value to set, otherwise ignored if the value is to be removed. + Value float64 + + // Whether the quota configuration value should be removed, otherwise set. + Remove bool +} + +type AlterClientQuotaResponseQuotas struct { + // Error is set to a non-nil value including the code and message if a top-level + // error was encountered when doing the update. + Error error + + // The altered quota entities. + Entities []AlterClientQuotaEntity +} + +// AlterClientQuotasResponse represents a response from a kafka broker to an alter client +// quotas request. +type AlterClientQuotasResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // List of altered client quotas responses. + Entries []AlterClientQuotaResponseQuotas +} + +// AlterClientQuotas sends client quotas alteration request to a kafka broker and returns +// the response. +func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) { + entries := make([]alterclientquotas.Entry, len(req.Entries)) + + for entryIdx, entry := range req.Entries { + entities := make([]alterclientquotas.Entity, len(entry.Entities)) + for entityIdx, entity := range entry.Entities { + entities[entityIdx] = alterclientquotas.Entity{ + EntityType: entity.EntityType, + EntityName: entity.EntityName, + } + } + + ops := make([]alterclientquotas.Ops, len(entry.Ops)) + for opsIdx, op := range entry.Ops { + ops[opsIdx] = alterclientquotas.Ops{ + Key: op.Key, + Value: op.Value, + Remove: op.Remove, + } + } + + entries[entryIdx] = alterclientquotas.Entry{ + Entities: entities, + Ops: ops, + } + } + + m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{ + Entries: entries, + ValidateOnly: req.ValidateOnly, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err) + } + + res := m.(*alterclientquotas.Response) + responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results)) + + for responseEntryIdx, responseEntry := range res.Results { + responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities)) + for responseEntityIdx, responseEntity := range responseEntry.Entities { + responseEntities[responseEntityIdx] = AlterClientQuotaEntity{ + EntityType: responseEntity.EntityType, + EntityName: responseEntity.EntityName, + } + } + + responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{ + Error: makeError(responseEntry.ErrorCode, responseEntry.ErrorMessage), + Entities: responseEntities, + } + } + ret := &AlterClientQuotasResponse{ + Throttle: makeDuration(res.ThrottleTimeMs), + Entries: responseEntries, + } + + return ret, nil +} diff --git a/vendor/github.com/segmentio/kafka-go/alteruserscramcredentials.go b/vendor/github.com/segmentio/kafka-go/alteruserscramcredentials.go new file mode 100644 index 000000000..6163e564e --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/alteruserscramcredentials.go @@ -0,0 +1,107 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/alteruserscramcredentials" +) + +// AlterUserScramCredentialsRequest represents a request sent to a kafka broker to +// alter user scram credentials. +type AlterUserScramCredentialsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // List of credentials to delete. + Deletions []UserScramCredentialsDeletion + + // List of credentials to upsert. + Upsertions []UserScramCredentialsUpsertion +} + +type ScramMechanism int8 + +const ( + ScramMechanismUnknown ScramMechanism = iota // 0 + ScramMechanismSha256 // 1 + ScramMechanismSha512 // 2 +) + +type UserScramCredentialsDeletion struct { + Name string + Mechanism ScramMechanism +} + +type UserScramCredentialsUpsertion struct { + Name string + Mechanism ScramMechanism + Iterations int + Salt []byte + SaltedPassword []byte +} + +// AlterUserScramCredentialsResponse represents a response from a kafka broker to an alter user +// credentials request. +type AlterUserScramCredentialsResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // List of altered user scram credentials. + Results []AlterUserScramCredentialsResponseUser +} + +type AlterUserScramCredentialsResponseUser struct { + User string + Error error +} + +// AlterUserScramCredentials sends user scram credentials alteration request to a kafka broker and returns +// the response. +func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) { + deletions := make([]alteruserscramcredentials.RequestUserScramCredentialsDeletion, len(req.Deletions)) + upsertions := make([]alteruserscramcredentials.RequestUserScramCredentialsUpsertion, len(req.Upsertions)) + + for deletionIdx, deletion := range req.Deletions { + deletions[deletionIdx] = alteruserscramcredentials.RequestUserScramCredentialsDeletion{ + Name: deletion.Name, + Mechanism: int8(deletion.Mechanism), + } + } + + for upsertionIdx, upsertion := range req.Upsertions { + upsertions[upsertionIdx] = alteruserscramcredentials.RequestUserScramCredentialsUpsertion{ + Name: upsertion.Name, + Mechanism: int8(upsertion.Mechanism), + Iterations: int32(upsertion.Iterations), + Salt: upsertion.Salt, + SaltedPassword: upsertion.SaltedPassword, + } + } + + m, err := c.roundTrip(ctx, req.Addr, &alteruserscramcredentials.Request{ + Deletions: deletions, + Upsertions: upsertions, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).AlterUserScramCredentials: %w", err) + } + + res := m.(*alteruserscramcredentials.Response) + responseEntries := make([]AlterUserScramCredentialsResponseUser, len(res.Results)) + + for responseIdx, responseResult := range res.Results { + responseEntries[responseIdx] = AlterUserScramCredentialsResponseUser{ + User: responseResult.User, + Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage), + } + } + ret := &AlterUserScramCredentialsResponse{ + Throttle: makeDuration(res.ThrottleTimeMs), + Results: responseEntries, + } + + return ret, nil +} diff --git a/vendor/github.com/segmentio/kafka-go/balancer.go b/vendor/github.com/segmentio/kafka-go/balancer.go index cd2e8c1c4..f4768cf88 100644 --- a/vendor/github.com/segmentio/kafka-go/balancer.go +++ b/vendor/github.com/segmentio/kafka-go/balancer.go @@ -260,7 +260,7 @@ func (b CRC32Balancer) Balance(msg Message, partitions ...int) (partition int) { // determine which partition to route messages to. This ensures that messages // with the same key are routed to the same partition. This balancer is // compatible with the partitioner used by the Java library and by librdkafka's -// "murmur2" and "murmur2_random" partitioners. / +// "murmur2" and "murmur2_random" partitioners. // // With the Consistent field false (default), this partitioner is equivalent to // the "murmur2_random" setting in librdkafka. When Consistent is true, this diff --git a/vendor/github.com/segmentio/kafka-go/batch.go b/vendor/github.com/segmentio/kafka-go/batch.go index f9f3e5227..19dcef8cd 100644 --- a/vendor/github.com/segmentio/kafka-go/batch.go +++ b/vendor/github.com/segmentio/kafka-go/batch.go @@ -79,10 +79,16 @@ func (batch *Batch) close() (err error) { batch.conn = nil batch.lock = nil + if batch.msgs != nil { batch.msgs.discard() } + if batch.msgs != nil && batch.msgs.decompressed != nil { + releaseBuffer(batch.msgs.decompressed) + batch.msgs.decompressed = nil + } + if err = batch.err; errors.Is(batch.err, io.EOF) { err = nil } diff --git a/vendor/github.com/segmentio/kafka-go/compress/compress.go b/vendor/github.com/segmentio/kafka-go/compress/compress.go index 6e92968f2..054bf03d0 100644 --- a/vendor/github.com/segmentio/kafka-go/compress/compress.go +++ b/vendor/github.com/segmentio/kafka-go/compress/compress.go @@ -13,7 +13,7 @@ import ( "github.com/segmentio/kafka-go/compress/zstd" ) -// Compression represents the the compression applied to a record set. +// Compression represents the compression applied to a record set. type Compression int8 const ( diff --git a/vendor/github.com/segmentio/kafka-go/conn.go b/vendor/github.com/segmentio/kafka-go/conn.go index aa84f5a49..2b51afbd5 100644 --- a/vendor/github.com/segmentio/kafka-go/conn.go +++ b/vendor/github.com/segmentio/kafka-go/conn.go @@ -9,7 +9,6 @@ import ( "net" "os" "path/filepath" - "runtime" "sync" "sync/atomic" "time" @@ -1419,7 +1418,6 @@ func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size // Optimistically release the read lock if a response has already // been received but the current operation is not the target for it. c.rlock.Unlock() - runtime.Gosched() } c.leave() diff --git a/vendor/github.com/segmentio/kafka-go/createtopics.go b/vendor/github.com/segmentio/kafka-go/createtopics.go index 6767e07c8..8ad9ebf44 100644 --- a/vendor/github.com/segmentio/kafka-go/createtopics.go +++ b/vendor/github.com/segmentio/kafka-go/createtopics.go @@ -3,7 +3,6 @@ package kafka import ( "bufio" "context" - "errors" "fmt" "net" "time" @@ -23,7 +22,7 @@ type CreateTopicsRequest struct { // When set to true, topics are not created but the configuration is // validated as if they were. // - // This field will be ignored if the kafka broker did no support the + // This field will be ignored if the kafka broker did not support the // CreateTopics API in version 1 or above. ValidateOnly bool } @@ -33,7 +32,7 @@ type CreateTopicsRequest struct { type CreateTopicsResponse struct { // The amount of time that the broker throttled the request. // - // This field will be zero if the kafka broker did no support the + // This field will be zero if the kafka broker did not support the // CreateTopics API in version 2 or above. Throttle time.Duration @@ -65,7 +64,6 @@ func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*C TimeoutMs: c.timeoutMs(ctx, defaultCreateTopicsTimeout), ValidateOnly: req.ValidateOnly, }) - if err != nil { return nil, fmt.Errorf("kafka.(*Client).CreateTopics: %w", err) } @@ -363,6 +361,9 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse return response, err } for _, tr := range response.TopicErrors { + if tr.ErrorCode == int16(TopicAlreadyExists) { + continue + } if tr.ErrorCode != 0 { return response, Error(tr.ErrorCode) } @@ -385,14 +386,5 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error { _, err := c.createTopics(createTopicsRequestV0{ Topics: requestV0Topics, }) - if err != nil { - if errors.Is(err, TopicAlreadyExists) { - // ok - return nil - } - - return err - } - - return nil + return err } diff --git a/vendor/github.com/segmentio/kafka-go/deleteacls.go b/vendor/github.com/segmentio/kafka-go/deleteacls.go new file mode 100644 index 000000000..64cbd26d1 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/deleteacls.go @@ -0,0 +1,114 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/deleteacls" +) + +// DeleteACLsRequest represents a request sent to a kafka broker to delete +// ACLs. +type DeleteACLsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // List of ACL filters to use for deletion. + Filters []DeleteACLsFilter +} + +type DeleteACLsFilter struct { + ResourceTypeFilter ResourceType + ResourceNameFilter string + ResourcePatternTypeFilter PatternType + PrincipalFilter string + HostFilter string + Operation ACLOperationType + PermissionType ACLPermissionType +} + +// DeleteACLsResponse represents a response from a kafka broker to an ACL +// deletion request. +type DeleteACLsResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // List of the results from the deletion request. + Results []DeleteACLsResult +} + +type DeleteACLsResult struct { + Error error + MatchingACLs []DeleteACLsMatchingACLs +} + +type DeleteACLsMatchingACLs struct { + Error error + ResourceType ResourceType + ResourceName string + ResourcePatternType PatternType + Principal string + Host string + Operation ACLOperationType + PermissionType ACLPermissionType +} + +// DeleteACLs sends ACLs deletion request to a kafka broker and returns the +// response. +func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) { + filters := make([]deleteacls.RequestFilter, 0, len(req.Filters)) + + for _, filter := range req.Filters { + filters = append(filters, deleteacls.RequestFilter{ + ResourceTypeFilter: int8(filter.ResourceTypeFilter), + ResourceNameFilter: filter.ResourceNameFilter, + ResourcePatternTypeFilter: int8(filter.ResourcePatternTypeFilter), + PrincipalFilter: filter.PrincipalFilter, + HostFilter: filter.HostFilter, + Operation: int8(filter.Operation), + PermissionType: int8(filter.PermissionType), + }) + } + + m, err := c.roundTrip(ctx, req.Addr, &deleteacls.Request{ + Filters: filters, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).DeleteACLs: %w", err) + } + + res := m.(*deleteacls.Response) + + results := make([]DeleteACLsResult, 0, len(res.FilterResults)) + + for _, result := range res.FilterResults { + matchingACLs := make([]DeleteACLsMatchingACLs, 0, len(result.MatchingACLs)) + + for _, matchingACL := range result.MatchingACLs { + matchingACLs = append(matchingACLs, DeleteACLsMatchingACLs{ + Error: makeError(matchingACL.ErrorCode, matchingACL.ErrorMessage), + ResourceType: ResourceType(matchingACL.ResourceType), + ResourceName: matchingACL.ResourceName, + ResourcePatternType: PatternType(matchingACL.ResourcePatternType), + Principal: matchingACL.Principal, + Host: matchingACL.Host, + Operation: ACLOperationType(matchingACL.Operation), + PermissionType: ACLPermissionType(matchingACL.PermissionType), + }) + } + + results = append(results, DeleteACLsResult{ + Error: makeError(result.ErrorCode, result.ErrorMessage), + MatchingACLs: matchingACLs, + }) + } + + ret := &DeleteACLsResponse{ + Throttle: makeDuration(res.ThrottleTimeMs), + Results: results, + } + + return ret, nil +} diff --git a/vendor/github.com/segmentio/kafka-go/deletegroups.go b/vendor/github.com/segmentio/kafka-go/deletegroups.go new file mode 100644 index 000000000..6317ae7fa --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/deletegroups.go @@ -0,0 +1,60 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/deletegroups" +) + +// DeleteGroupsRequest represents a request sent to a kafka broker to delete +// consumer groups. +type DeleteGroupsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // Identifiers of groups to delete. + GroupIDs []string +} + +// DeleteGroupsResponse represents a response from a kafka broker to a consumer group +// deletion request. +type DeleteGroupsResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // Mapping of group ids to errors that occurred while attempting to delete those groups. + // + // The errors contain the kafka error code. Programs may use the standard + // errors.Is function to test the error against kafka error codes. + Errors map[string]error +} + +// DeleteGroups sends a delete groups request and returns the response. The request is sent to the group coordinator of the first group +// of the request. All deleted groups must be managed by the same group coordinator. +func (c *Client) DeleteGroups( + ctx context.Context, + req *DeleteGroupsRequest, +) (*DeleteGroupsResponse, error) { + m, err := c.roundTrip(ctx, req.Addr, &deletegroups.Request{ + GroupIDs: req.GroupIDs, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).DeleteGroups: %w", err) + } + + r := m.(*deletegroups.Response) + + ret := &DeleteGroupsResponse{ + Throttle: makeDuration(r.ThrottleTimeMs), + Errors: make(map[string]error, len(r.Responses)), + } + + for _, t := range r.Responses { + ret.Errors[t.GroupID] = makeError(t.ErrorCode, "") + } + + return ret, nil +} diff --git a/vendor/github.com/segmentio/kafka-go/deletetopics.go b/vendor/github.com/segmentio/kafka-go/deletetopics.go index 470f9ef83..d758d9fd6 100644 --- a/vendor/github.com/segmentio/kafka-go/deletetopics.go +++ b/vendor/github.com/segmentio/kafka-go/deletetopics.go @@ -25,7 +25,7 @@ type DeleteTopicsRequest struct { type DeleteTopicsResponse struct { // The amount of time that the broker throttled the request. // - // This field will be zero if the kafka broker did no support the + // This field will be zero if the kafka broker did not support the // DeleteTopics API in version 1 or above. Throttle time.Duration diff --git a/vendor/github.com/segmentio/kafka-go/describeacls.go b/vendor/github.com/segmentio/kafka-go/describeacls.go new file mode 100644 index 000000000..d1093bbed --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/describeacls.go @@ -0,0 +1,107 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/describeacls" +) + +// DescribeACLsRequest represents a request sent to a kafka broker to describe +// existing ACLs. +type DescribeACLsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // Filter to filter ACLs on. + Filter ACLFilter +} + +type ACLFilter struct { + ResourceTypeFilter ResourceType + ResourceNameFilter string + // ResourcePatternTypeFilter was added in v1 and is not available prior to that. + ResourcePatternTypeFilter PatternType + PrincipalFilter string + HostFilter string + Operation ACLOperationType + PermissionType ACLPermissionType +} + +// DescribeACLsResponse represents a response from a kafka broker to an ACL +// describe request. +type DescribeACLsResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // Error that occurred while attempting to describe + // the ACLs. + Error error + + // ACL resources returned from the describe request. + Resources []ACLResource +} + +type ACLResource struct { + ResourceType ResourceType + ResourceName string + PatternType PatternType + ACLs []ACLDescription +} + +type ACLDescription struct { + Principal string + Host string + Operation ACLOperationType + PermissionType ACLPermissionType +} + +func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) { + m, err := c.roundTrip(ctx, req.Addr, &describeacls.Request{ + Filter: describeacls.ACLFilter{ + ResourceTypeFilter: int8(req.Filter.ResourceTypeFilter), + ResourceNameFilter: req.Filter.ResourceNameFilter, + ResourcePatternTypeFilter: int8(req.Filter.ResourcePatternTypeFilter), + PrincipalFilter: req.Filter.PrincipalFilter, + HostFilter: req.Filter.HostFilter, + Operation: int8(req.Filter.Operation), + PermissionType: int8(req.Filter.PermissionType), + }, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).DescribeACLs: %w", err) + } + + res := m.(*describeacls.Response) + resources := make([]ACLResource, len(res.Resources)) + + for resourceIdx, respResource := range res.Resources { + descriptions := make([]ACLDescription, len(respResource.ACLs)) + + for descriptionIdx, respDescription := range respResource.ACLs { + descriptions[descriptionIdx] = ACLDescription{ + Principal: respDescription.Principal, + Host: respDescription.Host, + Operation: ACLOperationType(respDescription.Operation), + PermissionType: ACLPermissionType(respDescription.PermissionType), + } + } + + resources[resourceIdx] = ACLResource{ + ResourceType: ResourceType(respResource.ResourceType), + ResourceName: respResource.ResourceName, + PatternType: PatternType(respResource.PatternType), + ACLs: descriptions, + } + } + + ret := &DescribeACLsResponse{ + Throttle: makeDuration(res.ThrottleTimeMs), + Error: makeError(res.ErrorCode, res.ErrorMessage), + Resources: resources, + } + + return ret, nil +} diff --git a/vendor/github.com/segmentio/kafka-go/describeclientquotas.go b/vendor/github.com/segmentio/kafka-go/describeclientquotas.go new file mode 100644 index 000000000..6291dcd98 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/describeclientquotas.go @@ -0,0 +1,126 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/describeclientquotas" +) + +// DescribeClientQuotasRequest represents a request sent to a kafka broker to +// describe client quotas. +type DescribeClientQuotasRequest struct { + // Address of the kafka broker to send the request to + Addr net.Addr + + // List of quota components to describe. + Components []DescribeClientQuotasRequestComponent + + // Whether the match is strict, i.e. should exclude entities with + // unspecified entity types. + Strict bool +} + +type DescribeClientQuotasRequestComponent struct { + // The entity type that the filter component applies to. + EntityType string + + // How to match the entity (0 = exact name, 1 = default name, + // 2 = any specified name). + MatchType int8 + + // The string to match against, or null if unused for the match type. + Match string +} + +// DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request. +type DescribeClientQuotasResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // Error is set to a non-nil value including the code and message if a top-level + // error was encountered when doing the update. + Error error + + // List of describe client quota responses. + Entries []DescribeClientQuotasResponseQuotas +} + +type DescribeClientQuotasEntity struct { + // The quota entity type. + EntityType string + + // The name of the quota entity, or null if the default. + EntityName string +} + +type DescribeClientQuotasValue struct { + // The quota configuration key. + Key string + + // The quota configuration value. + Value float64 +} + +type DescribeClientQuotasResponseQuotas struct { + // List of client quota entities and their descriptions. + Entities []DescribeClientQuotasEntity + + // The client quota configuration values. + Values []DescribeClientQuotasValue +} + +// DescribeClientQuotas sends a describe client quotas request to a kafka broker and returns +// the response. +func (c *Client) DescribeClientQuotas(ctx context.Context, req *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) { + components := make([]describeclientquotas.Component, len(req.Components)) + + for componentIdx, component := range req.Components { + components[componentIdx] = describeclientquotas.Component{ + EntityType: component.EntityType, + MatchType: component.MatchType, + Match: component.Match, + } + } + + m, err := c.roundTrip(ctx, req.Addr, &describeclientquotas.Request{ + Components: components, + Strict: req.Strict, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).DescribeClientQuotas: %w", err) + } + + res := m.(*describeclientquotas.Response) + responseEntries := make([]DescribeClientQuotasResponseQuotas, len(res.Entries)) + + for responseEntryIdx, responseEntry := range res.Entries { + responseEntities := make([]DescribeClientQuotasEntity, len(responseEntry.Entities)) + for responseEntityIdx, responseEntity := range responseEntry.Entities { + responseEntities[responseEntityIdx] = DescribeClientQuotasEntity{ + EntityType: responseEntity.EntityType, + EntityName: responseEntity.EntityName, + } + } + + responseValues := make([]DescribeClientQuotasValue, len(responseEntry.Values)) + for responseValueIdx, responseValue := range responseEntry.Values { + responseValues[responseValueIdx] = DescribeClientQuotasValue{ + Key: responseValue.Key, + Value: responseValue.Value, + } + } + responseEntries[responseEntryIdx] = DescribeClientQuotasResponseQuotas{ + Entities: responseEntities, + Values: responseValues, + } + } + ret := &DescribeClientQuotasResponse{ + Throttle: time.Duration(res.ThrottleTimeMs), + Entries: responseEntries, + } + + return ret, nil +} diff --git a/vendor/github.com/segmentio/kafka-go/describeconfigs.go b/vendor/github.com/segmentio/kafka-go/describeconfigs.go index 4f5c09514..17f4f305f 100644 --- a/vendor/github.com/segmentio/kafka-go/describeconfigs.go +++ b/vendor/github.com/segmentio/kafka-go/describeconfigs.go @@ -14,7 +14,7 @@ type DescribeConfigsRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr - // List of resources to update. + // List of resources to get details for. Resources []DescribeConfigRequestResource // Ignored if API version is less than v1 diff --git a/vendor/github.com/segmentio/kafka-go/describeuserscramcredentials.go b/vendor/github.com/segmentio/kafka-go/describeuserscramcredentials.go new file mode 100644 index 000000000..7194ea1e0 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/describeuserscramcredentials.go @@ -0,0 +1,97 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/describeuserscramcredentials" +) + +// DescribeUserScramCredentialsRequest represents a request sent to a kafka broker to +// describe user scram credentials. +type DescribeUserScramCredentialsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // List of Scram users to describe + Users []UserScramCredentialsUser +} + +type UserScramCredentialsUser struct { + Name string +} + +// DescribeUserScramCredentialsResponse represents a response from a kafka broker to a describe user +// credentials request. +type DescribeUserScramCredentialsResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // Top level error that occurred while attempting to describe + // the user scram credentials. + // + // The errors contain the kafka error code. Programs may use the standard + // errors.Is function to test the error against kafka error codes. + Error error + + // List of described user scram credentials. + Results []DescribeUserScramCredentialsResponseResult +} + +type DescribeUserScramCredentialsResponseResult struct { + User string + CredentialInfos []DescribeUserScramCredentialsCredentialInfo + Error error +} + +type DescribeUserScramCredentialsCredentialInfo struct { + Mechanism ScramMechanism + Iterations int +} + +// DescribeUserScramCredentials sends a user scram credentials describe request to a kafka broker and returns +// the response. +func (c *Client) DescribeUserScramCredentials(ctx context.Context, req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) { + users := make([]describeuserscramcredentials.RequestUser, len(req.Users)) + + for userIdx, user := range req.Users { + users[userIdx] = describeuserscramcredentials.RequestUser{ + Name: user.Name, + } + } + + m, err := c.roundTrip(ctx, req.Addr, &describeuserscramcredentials.Request{ + Users: users, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).DescribeUserScramCredentials: %w", err) + } + + res := m.(*describeuserscramcredentials.Response) + responseResults := make([]DescribeUserScramCredentialsResponseResult, len(res.Results)) + + for responseIdx, responseResult := range res.Results { + credentialInfos := make([]DescribeUserScramCredentialsCredentialInfo, len(responseResult.CredentialInfos)) + + for credentialInfoIdx, credentialInfo := range responseResult.CredentialInfos { + credentialInfos[credentialInfoIdx] = DescribeUserScramCredentialsCredentialInfo{ + Mechanism: ScramMechanism(credentialInfo.Mechanism), + Iterations: int(credentialInfo.Iterations), + } + } + responseResults[responseIdx] = DescribeUserScramCredentialsResponseResult{ + User: responseResult.User, + CredentialInfos: credentialInfos, + Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage), + } + } + ret := &DescribeUserScramCredentialsResponse{ + Throttle: makeDuration(res.ThrottleTimeMs), + Error: makeError(res.ErrorCode, res.ErrorMessage), + Results: responseResults, + } + + return ret, nil +} diff --git a/vendor/github.com/segmentio/kafka-go/fetch.go b/vendor/github.com/segmentio/kafka-go/fetch.go index e682aeadb..eafd0de88 100644 --- a/vendor/github.com/segmentio/kafka-go/fetch.go +++ b/vendor/github.com/segmentio/kafka-go/fetch.go @@ -49,7 +49,7 @@ type FetchResponse struct { Topic string Partition int - // Informations about the topic partition layout returned from the broker. + // Information about the topic partition layout returned from the broker. // // LastStableOffset requires the kafka broker to support the Fetch API in // version 4 or above (otherwise the value is zero). diff --git a/vendor/github.com/segmentio/kafka-go/message.go b/vendor/github.com/segmentio/kafka-go/message.go index 5fb7b8ebe..0539e6038 100644 --- a/vendor/github.com/segmentio/kafka-go/message.go +++ b/vendor/github.com/segmentio/kafka-go/message.go @@ -20,6 +20,11 @@ type Message struct { Value []byte Headers []Header + // This field is used to hold arbitrary data you wish to include, so it + // will be available when handle it on the Writer's `Completion` method, + // this support the application can do any post operation on each message. + WriterData interface{} + // If not set at the creation, Time will be automatically set when // writing the message. Time time.Time @@ -44,6 +49,17 @@ func (msg *Message) size() int32 { return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize } +func (msg *Message) headerSize() int { + return varArrayLen(len(msg.Headers), func(i int) int { + h := &msg.Headers[i] + return varStringLen(h.Key) + varBytesLen(h.Value) + }) +} + +func (msg *Message) totalSize() int32 { + return int32(msg.headerSize()) + msg.size() +} + type message struct { CRC int32 MagicByte int8 diff --git a/vendor/github.com/segmentio/kafka-go/message_reader.go b/vendor/github.com/segmentio/kafka-go/message_reader.go index 35e1067f2..a0a0385ef 100644 --- a/vendor/github.com/segmentio/kafka-go/message_reader.go +++ b/vendor/github.com/segmentio/kafka-go/message_reader.go @@ -22,7 +22,7 @@ type messageSetReader struct { // This is used to detect truncation of the response. lengthRemain int - decompressed bytes.Buffer + decompressed *bytes.Buffer } type readerStack struct { @@ -87,6 +87,7 @@ func newMessageSetReader(reader *bufio.Reader, remain int) (*messageSetReader, e reader: reader, remain: remain, }, + decompressed: acquireBuffer(), } err := res.readHeader() return res, err @@ -158,7 +159,9 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB if codec, err = r.header.compression(); err != nil { return } - r.log("Reading with codec=%T", codec) + if r.debug { + r.log("Reading with codec=%T", codec) + } if codec != nil { // discard next four bytes...will be -1 to indicate null key if err = r.discardN(4); err != nil { @@ -199,7 +202,7 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB // Allocate a buffer of size 0, which gets capped at 16 bytes // by the bufio package. We are already reading buffered data // here, no need to reserve another 4KB buffer. - reader: bufio.NewReaderSize(&r.decompressed, 0), + reader: bufio.NewReaderSize(r.decompressed, 0), remain: r.decompressed.Len(), base: offset, parent: r.readerStack, @@ -278,7 +281,7 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt } r.remain -= batchRemain - int(limitReader.N) r.readerStack = &readerStack{ - reader: bufio.NewReaderSize(&r.decompressed, 0), // the new stack reads from the decompressed buffer + reader: bufio.NewReaderSize(r.decompressed, 0), // the new stack reads from the decompressed buffer remain: r.decompressed.Len(), base: -1, // base is unused here parent: r.readerStack, @@ -351,14 +354,18 @@ func (r *messageSetReader) markRead() { } r.count-- r.unwindStack() - r.log("Mark read remain=%d", r.remain) + if r.debug { + r.log("Mark read remain=%d", r.remain) + } } func (r *messageSetReader) unwindStack() { for r.count == 0 { if r.remain == 0 { if r.parent != nil { - r.log("Popped reader stack") + if r.debug { + r.log("Popped reader stack") + } r.readerStack = r.parent continue } @@ -425,7 +432,9 @@ func (r *messageSetReader) readHeader() (err error) { // Set arbitrary non-zero length so that we always assume the // message is truncated since bytes remain. r.lengthRemain = 1 - r.log("Read v0 header with offset=%d len=%d magic=%d attributes=%d", r.header.firstOffset, r.header.length, r.header.magic, r.header.v1.attributes) + if r.debug { + r.log("Read v0 header with offset=%d len=%d magic=%d attributes=%d", r.header.firstOffset, r.header.length, r.header.magic, r.header.v1.attributes) + } case 1: r.header.crc = crcOrLeaderEpoch if err = r.readInt8(&r.header.v1.attributes); err != nil { @@ -438,7 +447,9 @@ func (r *messageSetReader) readHeader() (err error) { // Set arbitrary non-zero length so that we always assume the // message is truncated since bytes remain. r.lengthRemain = 1 - r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes) + if r.debug { + r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes) + } case 2: r.header.v2.leaderEpoch = crcOrLeaderEpoch if err = r.readInt32(&r.header.crc); err != nil { @@ -471,7 +482,9 @@ func (r *messageSetReader) readHeader() (err error) { r.count = int(r.header.v2.count) // Subtracts the header bytes from the length r.lengthRemain = int(r.header.length) - 49 - r.log("Read v2 header with count=%d offset=%d len=%d magic=%d attributes=%d", r.count, r.header.firstOffset, r.header.length, r.header.magic, r.header.v2.attributes) + if r.debug { + r.log("Read v2 header with count=%d offset=%d len=%d magic=%d attributes=%d", r.count, r.header.firstOffset, r.header.length, r.header.magic, r.header.v2.attributes) + } default: err = r.header.badMagic() return @@ -520,9 +533,7 @@ func (r *messageSetReader) readBytesWith(fn readBytesFunc) (err error) { } func (r *messageSetReader) log(msg string, args ...interface{}) { - if r.debug { - log.Printf("[DEBUG] "+msg, args...) - } + log.Printf("[DEBUG] "+msg, args...) } func extractOffset(base int64, msgSet []byte) (offset int64, err error) { diff --git a/vendor/github.com/segmentio/kafka-go/offsetfetch.go b/vendor/github.com/segmentio/kafka-go/offsetfetch.go index 61fcba2e3..b85bc5c83 100644 --- a/vendor/github.com/segmentio/kafka-go/offsetfetch.go +++ b/vendor/github.com/segmentio/kafka-go/offsetfetch.go @@ -66,19 +66,28 @@ type OffsetFetchPartition struct { // OffsetFetch sends an offset fetch request to a kafka broker and returns the // response. func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) { - topics := make([]offsetfetch.RequestTopic, 0, len(req.Topics)) - for topicName, partitions := range req.Topics { - indexes := make([]int32, len(partitions)) + // Kafka version 0.10.2.x and above allow null Topics map for OffsetFetch API + // which will return the result for all topics with the desired consumer group: + // https://kafka.apache.org/0102/protocol.html#The_Messages_OffsetFetch + // For Kafka version below 0.10.2.x this call will result in an error + var topics []offsetfetch.RequestTopic - for i, p := range partitions { - indexes[i] = int32(p) - } + if len(req.Topics) > 0 { + topics = make([]offsetfetch.RequestTopic, 0, len(req.Topics)) + + for topicName, partitions := range req.Topics { + indexes := make([]int32, len(partitions)) - topics = append(topics, offsetfetch.RequestTopic{ - Name: topicName, - PartitionIndexes: indexes, - }) + for i, p := range partitions { + indexes[i] = int32(p) + } + + topics = append(topics, offsetfetch.RequestTopic{ + Name: topicName, + PartitionIndexes: indexes, + }) + } } m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{ diff --git a/vendor/github.com/segmentio/kafka-go/produce.go b/vendor/github.com/segmentio/kafka-go/produce.go index 1a196fe6b..72d1ed09b 100644 --- a/vendor/github.com/segmentio/kafka-go/produce.go +++ b/vendor/github.com/segmentio/kafka-go/produce.go @@ -111,26 +111,26 @@ type ProduceResponse struct { // Offset of the first record that was written to the topic partition. // - // This field will be zero if the kafka broker did no support the Produce - // API in version 3 or above. + // This field will be zero if the kafka broker did not support Produce API + // version 3 or above. BaseOffset int64 // Time at which the broker wrote the records to the topic partition. // - // This field will be zero if the kafka broker did no support the Produce - // API in version 2 or above. + // This field will be zero if the kafka broker did not support Produce API + // version 2 or above. LogAppendTime time.Time // First offset in the topic partition that the records were written to. // - // This field will be zero if the kafka broker did no support the Produce - // API in version 5 or above (or if the first offset is zero). + // This field will be zero if the kafka broker did not support Produce + // API version 5 or above (or if the first offset is zero). LogStartOffset int64 // If errors occurred writing specific records, they will be reported in // this map. // - // This field will always be empty if the kafka broker did no support the + // This field will always be empty if the kafka broker did not support the // Produce API in version 8 or above. RecordErrors map[int]error } diff --git a/vendor/github.com/segmentio/kafka-go/protocol/alterclientquotas/alterclientquotas.go b/vendor/github.com/segmentio/kafka-go/protocol/alterclientquotas/alterclientquotas.go new file mode 100644 index 000000000..c657d92ac --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/protocol/alterclientquotas/alterclientquotas.go @@ -0,0 +1,68 @@ +package alterclientquotas + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_AlterClientQuotas +type Request struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + Entries []Entry `kafka:"min=v0,max=v1"` + ValidateOnly bool `kafka:"min=v0,max=v1"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.AlterClientQuotas } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type Entry struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + Entities []Entity `kafka:"min=v0,max=v1"` + Ops []Ops `kafka:"min=v0,max=v1"` +} + +type Entity struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + EntityType string `kafka:"min=v0,max=v0|min=v1,max=v1,compact"` + EntityName string `kafka:"min=v0,max=v0,nullable|min=v1,max=v1,nullable,compact"` +} + +type Ops struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + Key string `kafka:"min=v0,max=v0|min=v1,max=v1,compact"` + Value float64 `kafka:"min=v0,max=v1"` + Remove bool `kafka:"min=v0,max=v1"` +} + +type Response struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + ThrottleTimeMs int32 `kafka:"min=v0,max=v1"` + Results []ResponseQuotas `kafka:"min=v0,max=v1"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.AlterClientQuotas } + +type ResponseQuotas struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + ErrorCode int16 `kafka:"min=v0,max=v1"` + ErrorMessage string `kafka:"min=v0,max=v1,nullable"` + Entities []Entity `kafka:"min=v0,max=v1"` +} + +var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/alteruserscramcredentials/alteruserscramcredentials.go b/vendor/github.com/segmentio/kafka-go/protocol/alteruserscramcredentials/alteruserscramcredentials.go new file mode 100644 index 000000000..b5369be20 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/protocol/alteruserscramcredentials/alteruserscramcredentials.go @@ -0,0 +1,66 @@ +package alteruserscramcredentials + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Deletions []RequestUserScramCredentialsDeletion `kafka:"min=v0,max=v0"` + Upsertions []RequestUserScramCredentialsUpsertion `kafka:"min=v0,max=v0"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.AlterUserScramCredentials } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type RequestUserScramCredentialsDeletion struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Name string `kafka:"min=v0,max=v0,compact"` + Mechanism int8 `kafka:"min=v0,max=v0"` +} + +type RequestUserScramCredentialsUpsertion struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Name string `kafka:"min=v0,max=v0,compact"` + Mechanism int8 `kafka:"min=v0,max=v0"` + Iterations int32 `kafka:"min=v0,max=v0"` + Salt []byte `kafka:"min=v0,max=v0,compact"` + SaltedPassword []byte `kafka:"min=v0,max=v0,compact"` +} + +type Response struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v0"` + Results []ResponseUserScramCredentials `kafka:"min=v0,max=v0"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.AlterUserScramCredentials } + +type ResponseUserScramCredentials struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + User string `kafka:"min=v0,max=v0,compact"` + ErrorCode int16 `kafka:"min=v0,max=v0"` + ErrorMessage string `kafka:"min=v0,max=v0,nullable"` +} + +var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/createacls/createacls.go b/vendor/github.com/segmentio/kafka-go/protocol/createacls/createacls.go index 893be44dd..aad0cc07c 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/createacls/createacls.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/createacls/createacls.go @@ -9,9 +9,9 @@ func init() { type Request struct { // We need at least one tagged field to indicate that v2+ uses "flexible" // messages. - _ struct{} `kafka:"min=v2,max=v2,tag"` + _ struct{} `kafka:"min=v2,max=v3,tag"` - Creations []RequestACLs `kafka:"min=v0,max=v2"` + Creations []RequestACLs `kafka:"min=v0,max=v3"` } func (r *Request) ApiKey() protocol.ApiKey { return protocol.CreateAcls } @@ -21,29 +21,37 @@ func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { } type RequestACLs struct { - ResourceType int8 `kafka:"min=v0,max=v2"` - ResourceName string `kafka:"min=v0,max=v2"` - ResourcePatternType int8 `kafka:"min=v0,max=v2"` - Principal string `kafka:"min=v0,max=v2"` - Host string `kafka:"min=v0,max=v2"` - Operation int8 `kafka:"min=v0,max=v2"` - PermissionType int8 `kafka:"min=v0,max=v2"` + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ResourceType int8 `kafka:"min=v0,max=v3"` + ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + ResourcePatternType int8 `kafka:"min=v1,max=v3"` + Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` } type Response struct { // We need at least one tagged field to indicate that v2+ uses "flexible" // messages. - _ struct{} `kafka:"min=v2,max=v2,tag"` + _ struct{} `kafka:"min=v2,max=v3,tag"` - ThrottleTimeMs int32 `kafka:"min=v0,max=v2"` - Results []ResponseACLs `kafka:"min=v0,max=v2"` + ThrottleTimeMs int32 `kafka:"min=v0,max=v3"` + Results []ResponseACLs `kafka:"min=v0,max=v3"` } func (r *Response) ApiKey() protocol.ApiKey { return protocol.CreateAcls } type ResponseACLs struct { - ErrorCode int16 `kafka:"min=v0,max=v2"` - ErrorMessage string `kafka:"min=v0,max=v2,nullable"` + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ErrorCode int16 `kafka:"min=v0,max=v3"` + ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` } var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/decode.go b/vendor/github.com/segmentio/kafka-go/protocol/decode.go index 690f30a07..5bf61ffa4 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/decode.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/decode.go @@ -7,6 +7,7 @@ import ( "hash/crc32" "io" "io/ioutil" + "math" "reflect" "sync" "sync/atomic" @@ -85,6 +86,10 @@ func (d *decoder) decodeInt64(v value) { v.setInt64(d.readInt64()) } +func (d *decoder) decodeFloat64(v value) { + v.setFloat64(d.readFloat64()) +} + func (d *decoder) decodeString(v value) { v.setString(d.readString()) } @@ -216,6 +221,13 @@ func (d *decoder) readInt64() int64 { return 0 } +func (d *decoder) readFloat64() float64 { + if d.readFull(d.buffer[:8]) { + return readFloat64(d.buffer[:8]) + } + return 0 +} + func (d *decoder) readString() string { if n := d.readInt16(); n < 0 { return "" @@ -342,6 +354,8 @@ func decodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) return (*decoder).decodeInt32 case reflect.Int64: return (*decoder).decodeInt64 + case reflect.Float64: + return (*decoder).decodeFloat64 case reflect.String: return stringDecodeFuncOf(flexible, tag) case reflect.Struct: @@ -469,6 +483,10 @@ func readInt64(b []byte) int64 { return int64(binary.BigEndian.Uint64(b)) } +func readFloat64(b []byte) float64 { + return math.Float64frombits(binary.BigEndian.Uint64(b)) +} + func Unmarshal(data []byte, version int16, value interface{}) error { typ := elemTypeOf(value) cache, _ := unmarshalers.Load().(map[versionedType]decodeFunc) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/deleteacls/deleteacls.go b/vendor/github.com/segmentio/kafka-go/protocol/deleteacls/deleteacls.go new file mode 100644 index 000000000..7f0f002f3 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/protocol/deleteacls/deleteacls.go @@ -0,0 +1,74 @@ +package deleteacls + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + Filters []RequestFilter `kafka:"min=v0,max=v3"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.DeleteAcls } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type RequestFilter struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ResourceTypeFilter int8 `kafka:"min=v0,max=v3"` + ResourceNameFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + ResourcePatternTypeFilter int8 `kafka:"min=v1,max=v3"` + PrincipalFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + HostFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` +} + +type Response struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v3"` + FilterResults []FilterResult `kafka:"min=v0,max=v3"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.DeleteAcls } + +type FilterResult struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ErrorCode int16 `kafka:"min=v0,max=v3"` + ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + MatchingACLs []MatchingACL `kafka:"min=v0,max=v3"` +} + +type MatchingACL struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ErrorCode int16 `kafka:"min=v0,max=v3"` + ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + ResourceType int8 `kafka:"min=v0,max=v3"` + ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + ResourcePatternType int8 `kafka:"min=v1,max=v3"` + Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` +} + +var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/deletegroups/deletegroups.go b/vendor/github.com/segmentio/kafka-go/protocol/deletegroups/deletegroups.go new file mode 100644 index 000000000..759dfc2fe --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/protocol/deletegroups/deletegroups.go @@ -0,0 +1,45 @@ +package deletegroups + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v2,max=v2,tag"` + + GroupIDs []string `kafka:"min=v0,max=v2"` +} + +func (r *Request) Group() string { + // use first group to determine group coordinator + if len(r.GroupIDs) > 0 { + return r.GroupIDs[0] + } + return "" +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.DeleteGroups } + +var ( + _ protocol.GroupMessage = (*Request)(nil) +) + +type Response struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v2,max=v2,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v2"` + Responses []ResponseGroup `kafka:"min=v0,max=v2"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.DeleteGroups } + +type ResponseGroup struct { + GroupID string `kafka:"min=v0,max=v2"` + ErrorCode int16 `kafka:"min=v0,max=v2"` +} diff --git a/vendor/github.com/segmentio/kafka-go/protocol/describeacls/describeacls.go b/vendor/github.com/segmentio/kafka-go/protocol/describeacls/describeacls.go new file mode 100644 index 000000000..93a7d2ed7 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/protocol/describeacls/describeacls.go @@ -0,0 +1,72 @@ +package describeacls + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + Filter ACLFilter `kafka:"min=v0,max=v3"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeAcls } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type ACLFilter struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ResourceTypeFilter int8 `kafka:"min=v0,max=v3"` + ResourceNameFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + ResourcePatternTypeFilter int8 `kafka:"min=v1,max=v3"` + PrincipalFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + HostFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` +} + +type Response struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v3"` + ErrorCode int16 `kafka:"min=v0,max=v3"` + ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + Resources []Resource `kafka:"min=v0,max=v3"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeAcls } + +type Resource struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ResourceType int8 `kafka:"min=v0,max=v3"` + ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + PatternType int8 `kafka:"min=v1,max=v3"` + ACLs []ResponseACL `kafka:"min=v0,max=v3"` +} + +type ResponseACL struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` +} + +var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/describeclientquotas/describeclientquotas.go b/vendor/github.com/segmentio/kafka-go/protocol/describeclientquotas/describeclientquotas.go new file mode 100644 index 000000000..e137776bf --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/protocol/describeclientquotas/describeclientquotas.go @@ -0,0 +1,68 @@ +package describeclientquotas + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + Components []Component `kafka:"min=v0,max=v1"` + Strict bool `kafka:"min=v0,max=v1"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeClientQuotas } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type Component struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + EntityType string `kafka:"min=v0,max=v1"` + MatchType int8 `kafka:"min=v0,max=v1"` + Match string `kafka:"min=v0,max=v1,nullable"` +} + +type Response struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + ThrottleTimeMs int32 `kafka:"min=v0,max=v1"` + ErrorCode int16 `kafka:"min=v0,max=v1"` + ErrorMessage string `kafka:"min=v0,max=v0,nullable|min=v1,max=v1,nullable,compact"` + Entries []ResponseQuotas `kafka:"min=v0,max=v1"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeClientQuotas } + +type Entity struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + EntityType string `kafka:"min=v0,max=v0|min=v1,max=v1,compact"` + EntityName string `kafka:"min=v0,max=v0,nullable|min=v1,max=v1,nullable,compact"` +} + +type Value struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + Key string `kafka:"min=v0,max=v0|min=v1,max=v1,compact"` + Value float64 `kafka:"min=v0,max=v1"` +} + +type ResponseQuotas struct { + // We need at least one tagged field to indicate that this is a "flexible" message + // type. + _ struct{} `kafka:"min=v1,max=v1,tag"` + Entities []Entity `kafka:"min=v0,max=v1"` + Values []Value `kafka:"min=v0,max=v1"` +} + +var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/describeuserscramcredentials/describeuserscramcredentials.go b/vendor/github.com/segmentio/kafka-go/protocol/describeuserscramcredentials/describeuserscramcredentials.go new file mode 100644 index 000000000..e923b9a11 --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/protocol/describeuserscramcredentials/describeuserscramcredentials.go @@ -0,0 +1,64 @@ +package describeuserscramcredentials + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Users []RequestUser `kafka:"min=v0,max=v0"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeUserScramCredentials } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type RequestUser struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Name string `kafka:"min=v0,max=v0,compact"` +} + +type Response struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v0"` + ErrorCode int16 `kafka:"min=v0,max=v0"` + ErrorMessage string `kafka:"min=v0,max=v0,nullable"` + Results []ResponseResult `kafka:"min=v0,max=v0"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeUserScramCredentials } + +type ResponseResult struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + User string `kafka:"min=v0,max=v0,compact"` + ErrorCode int16 `kafka:"min=v0,max=v0"` + ErrorMessage string `kafka:"min=v0,max=v0,nullable"` + CredentialInfos []CredentialInfo `kafka:"min=v0,max=v0"` +} + +type CredentialInfo struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v0,max=v0,tag"` + + Mechanism int8 `kafka:"min=v0,max=v0"` + Iterations int32 `kafka:"min=v0,max=v0"` +} + +var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/encode.go b/vendor/github.com/segmentio/kafka-go/protocol/encode.go index a483a81ec..bd1633671 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/encode.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/encode.go @@ -6,6 +6,7 @@ import ( "fmt" "hash/crc32" "io" + "math" "reflect" "sync" "sync/atomic" @@ -129,6 +130,10 @@ func (e *encoder) encodeInt64(v value) { e.writeInt64(v.int64()) } +func (e *encoder) encodeFloat64(v value) { + e.writeFloat64(v.float64()) +} + func (e *encoder) encodeString(v value) { e.writeString(v.string()) } @@ -230,6 +235,11 @@ func (e *encoder) writeInt64(i int64) { e.Write(e.buffer[:8]) } +func (e *encoder) writeFloat64(f float64) { + writeFloat64(e.buffer[:8], f) + e.Write(e.buffer[:8]) +} + func (e *encoder) writeString(s string) { e.writeInt16(int16(len(s))) e.WriteString(s) @@ -378,6 +388,8 @@ func encodeFuncOf(typ reflect.Type, version int16, flexible bool, tag structTag) return (*encoder).encodeInt32 case reflect.Int64: return (*encoder).encodeInt64 + case reflect.Float64: + return (*encoder).encodeFloat64 case reflect.String: return stringEncodeFuncOf(flexible, tag) case reflect.Struct: @@ -530,6 +542,10 @@ func writeInt64(b []byte, i int64) { binary.BigEndian.PutUint64(b, uint64(i)) } +func writeFloat64(b []byte, f float64) { + binary.BigEndian.PutUint64(b, math.Float64bits(f)) +} + func Marshal(version int16, value interface{}) ([]byte, error) { typ := typeOf(value) cache, _ := marshalers.Load().(map[versionedType]encodeFunc) diff --git a/vendor/github.com/segmentio/kafka-go/protocol/offsetfetch/offsetfetch.go b/vendor/github.com/segmentio/kafka-go/protocol/offsetfetch/offsetfetch.go index 011003340..8f1096f5d 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/offsetfetch/offsetfetch.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/offsetfetch/offsetfetch.go @@ -8,7 +8,7 @@ func init() { type Request struct { GroupID string `kafka:"min=v0,max=v5"` - Topics []RequestTopic `kafka:"min=v0,max=v5"` + Topics []RequestTopic `kafka:"min=v0,max=v5,nullable"` } func (r *Request) ApiKey() protocol.ApiKey { return protocol.OffsetFetch } diff --git a/vendor/github.com/segmentio/kafka-go/protocol/protocol.go b/vendor/github.com/segmentio/kafka-go/protocol/protocol.go index f5f536148..ebf91a798 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/protocol.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/protocol.go @@ -53,111 +53,115 @@ func (k ApiKey) apiType() apiType { } const ( - Produce ApiKey = 0 - Fetch ApiKey = 1 - ListOffsets ApiKey = 2 - Metadata ApiKey = 3 - LeaderAndIsr ApiKey = 4 - StopReplica ApiKey = 5 - UpdateMetadata ApiKey = 6 - ControlledShutdown ApiKey = 7 - OffsetCommit ApiKey = 8 - OffsetFetch ApiKey = 9 - FindCoordinator ApiKey = 10 - JoinGroup ApiKey = 11 - Heartbeat ApiKey = 12 - LeaveGroup ApiKey = 13 - SyncGroup ApiKey = 14 - DescribeGroups ApiKey = 15 - ListGroups ApiKey = 16 - SaslHandshake ApiKey = 17 - ApiVersions ApiKey = 18 - CreateTopics ApiKey = 19 - DeleteTopics ApiKey = 20 - DeleteRecords ApiKey = 21 - InitProducerId ApiKey = 22 - OffsetForLeaderEpoch ApiKey = 23 - AddPartitionsToTxn ApiKey = 24 - AddOffsetsToTxn ApiKey = 25 - EndTxn ApiKey = 26 - WriteTxnMarkers ApiKey = 27 - TxnOffsetCommit ApiKey = 28 - DescribeAcls ApiKey = 29 - CreateAcls ApiKey = 30 - DeleteAcls ApiKey = 31 - DescribeConfigs ApiKey = 32 - AlterConfigs ApiKey = 33 - AlterReplicaLogDirs ApiKey = 34 - DescribeLogDirs ApiKey = 35 - SaslAuthenticate ApiKey = 36 - CreatePartitions ApiKey = 37 - CreateDelegationToken ApiKey = 38 - RenewDelegationToken ApiKey = 39 - ExpireDelegationToken ApiKey = 40 - DescribeDelegationToken ApiKey = 41 - DeleteGroups ApiKey = 42 - ElectLeaders ApiKey = 43 - IncrementalAlterConfigs ApiKey = 44 - AlterPartitionReassignments ApiKey = 45 - ListPartitionReassignments ApiKey = 46 - OffsetDelete ApiKey = 47 - DescribeClientQuotas ApiKey = 48 - AlterClientQuotas ApiKey = 49 - - numApis = 50 + Produce ApiKey = 0 + Fetch ApiKey = 1 + ListOffsets ApiKey = 2 + Metadata ApiKey = 3 + LeaderAndIsr ApiKey = 4 + StopReplica ApiKey = 5 + UpdateMetadata ApiKey = 6 + ControlledShutdown ApiKey = 7 + OffsetCommit ApiKey = 8 + OffsetFetch ApiKey = 9 + FindCoordinator ApiKey = 10 + JoinGroup ApiKey = 11 + Heartbeat ApiKey = 12 + LeaveGroup ApiKey = 13 + SyncGroup ApiKey = 14 + DescribeGroups ApiKey = 15 + ListGroups ApiKey = 16 + SaslHandshake ApiKey = 17 + ApiVersions ApiKey = 18 + CreateTopics ApiKey = 19 + DeleteTopics ApiKey = 20 + DeleteRecords ApiKey = 21 + InitProducerId ApiKey = 22 + OffsetForLeaderEpoch ApiKey = 23 + AddPartitionsToTxn ApiKey = 24 + AddOffsetsToTxn ApiKey = 25 + EndTxn ApiKey = 26 + WriteTxnMarkers ApiKey = 27 + TxnOffsetCommit ApiKey = 28 + DescribeAcls ApiKey = 29 + CreateAcls ApiKey = 30 + DeleteAcls ApiKey = 31 + DescribeConfigs ApiKey = 32 + AlterConfigs ApiKey = 33 + AlterReplicaLogDirs ApiKey = 34 + DescribeLogDirs ApiKey = 35 + SaslAuthenticate ApiKey = 36 + CreatePartitions ApiKey = 37 + CreateDelegationToken ApiKey = 38 + RenewDelegationToken ApiKey = 39 + ExpireDelegationToken ApiKey = 40 + DescribeDelegationToken ApiKey = 41 + DeleteGroups ApiKey = 42 + ElectLeaders ApiKey = 43 + IncrementalAlterConfigs ApiKey = 44 + AlterPartitionReassignments ApiKey = 45 + ListPartitionReassignments ApiKey = 46 + OffsetDelete ApiKey = 47 + DescribeClientQuotas ApiKey = 48 + AlterClientQuotas ApiKey = 49 + DescribeUserScramCredentials ApiKey = 50 + AlterUserScramCredentials ApiKey = 51 + + numApis = 52 ) var apiNames = [numApis]string{ - Produce: "Produce", - Fetch: "Fetch", - ListOffsets: "ListOffsets", - Metadata: "Metadata", - LeaderAndIsr: "LeaderAndIsr", - StopReplica: "StopReplica", - UpdateMetadata: "UpdateMetadata", - ControlledShutdown: "ControlledShutdown", - OffsetCommit: "OffsetCommit", - OffsetFetch: "OffsetFetch", - FindCoordinator: "FindCoordinator", - JoinGroup: "JoinGroup", - Heartbeat: "Heartbeat", - LeaveGroup: "LeaveGroup", - SyncGroup: "SyncGroup", - DescribeGroups: "DescribeGroups", - ListGroups: "ListGroups", - SaslHandshake: "SaslHandshake", - ApiVersions: "ApiVersions", - CreateTopics: "CreateTopics", - DeleteTopics: "DeleteTopics", - DeleteRecords: "DeleteRecords", - InitProducerId: "InitProducerId", - OffsetForLeaderEpoch: "OffsetForLeaderEpoch", - AddPartitionsToTxn: "AddPartitionsToTxn", - AddOffsetsToTxn: "AddOffsetsToTxn", - EndTxn: "EndTxn", - WriteTxnMarkers: "WriteTxnMarkers", - TxnOffsetCommit: "TxnOffsetCommit", - DescribeAcls: "DescribeAcls", - CreateAcls: "CreateAcls", - DeleteAcls: "DeleteAcls", - DescribeConfigs: "DescribeConfigs", - AlterConfigs: "AlterConfigs", - AlterReplicaLogDirs: "AlterReplicaLogDirs", - DescribeLogDirs: "DescribeLogDirs", - SaslAuthenticate: "SaslAuthenticate", - CreatePartitions: "CreatePartitions", - CreateDelegationToken: "CreateDelegationToken", - RenewDelegationToken: "RenewDelegationToken", - ExpireDelegationToken: "ExpireDelegationToken", - DescribeDelegationToken: "DescribeDelegationToken", - DeleteGroups: "DeleteGroups", - ElectLeaders: "ElectLeaders", - IncrementalAlterConfigs: "IncrementalAlterConfigs", - AlterPartitionReassignments: "AlterPartitionReassignments", - ListPartitionReassignments: "ListPartitionReassignments", - OffsetDelete: "OffsetDelete", - DescribeClientQuotas: "DescribeClientQuotas", - AlterClientQuotas: "AlterClientQuotas", + Produce: "Produce", + Fetch: "Fetch", + ListOffsets: "ListOffsets", + Metadata: "Metadata", + LeaderAndIsr: "LeaderAndIsr", + StopReplica: "StopReplica", + UpdateMetadata: "UpdateMetadata", + ControlledShutdown: "ControlledShutdown", + OffsetCommit: "OffsetCommit", + OffsetFetch: "OffsetFetch", + FindCoordinator: "FindCoordinator", + JoinGroup: "JoinGroup", + Heartbeat: "Heartbeat", + LeaveGroup: "LeaveGroup", + SyncGroup: "SyncGroup", + DescribeGroups: "DescribeGroups", + ListGroups: "ListGroups", + SaslHandshake: "SaslHandshake", + ApiVersions: "ApiVersions", + CreateTopics: "CreateTopics", + DeleteTopics: "DeleteTopics", + DeleteRecords: "DeleteRecords", + InitProducerId: "InitProducerId", + OffsetForLeaderEpoch: "OffsetForLeaderEpoch", + AddPartitionsToTxn: "AddPartitionsToTxn", + AddOffsetsToTxn: "AddOffsetsToTxn", + EndTxn: "EndTxn", + WriteTxnMarkers: "WriteTxnMarkers", + TxnOffsetCommit: "TxnOffsetCommit", + DescribeAcls: "DescribeAcls", + CreateAcls: "CreateAcls", + DeleteAcls: "DeleteAcls", + DescribeConfigs: "DescribeConfigs", + AlterConfigs: "AlterConfigs", + AlterReplicaLogDirs: "AlterReplicaLogDirs", + DescribeLogDirs: "DescribeLogDirs", + SaslAuthenticate: "SaslAuthenticate", + CreatePartitions: "CreatePartitions", + CreateDelegationToken: "CreateDelegationToken", + RenewDelegationToken: "RenewDelegationToken", + ExpireDelegationToken: "ExpireDelegationToken", + DescribeDelegationToken: "DescribeDelegationToken", + DeleteGroups: "DeleteGroups", + ElectLeaders: "ElectLeaders", + IncrementalAlterConfigs: "IncrementalAlterConfigs", + AlterPartitionReassignments: "AlterPartitionReassignments", + ListPartitionReassignments: "ListPartitionReassignments", + OffsetDelete: "OffsetDelete", + DescribeClientQuotas: "DescribeClientQuotas", + AlterClientQuotas: "AlterClientQuotas", + DescribeUserScramCredentials: "DescribeUserScramCredentials", + AlterUserScramCredentials: "AlterUserScramCredentials", } type messageType struct { diff --git a/vendor/github.com/segmentio/kafka-go/protocol/reflect.go b/vendor/github.com/segmentio/kafka-go/protocol/reflect.go index 910fd6219..4d664b26b 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/reflect.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/reflect.go @@ -45,6 +45,8 @@ func (v value) int32() int32 { return int32(v.int64()) } func (v value) int64() int64 { return v.val.Int() } +func (v value) float64() float64 { return v.val.Float() } + func (v value) string() string { return v.val.String() } func (v value) bytes() []byte { return v.val.Bytes() } @@ -63,6 +65,8 @@ func (v value) setInt32(i int32) { v.setInt64(int64(i)) } func (v value) setInt64(i int64) { v.val.SetInt(i) } +func (v value) setFloat64(f float64) { v.val.SetFloat(f) } + func (v value) setString(s string) { v.val.SetString(s) } func (v value) setBytes(b []byte) { v.val.SetBytes(b) } diff --git a/vendor/github.com/segmentio/kafka-go/protocol/reflect_unsafe.go b/vendor/github.com/segmentio/kafka-go/protocol/reflect_unsafe.go index 0e8397242..9eca5060f 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol/reflect_unsafe.go +++ b/vendor/github.com/segmentio/kafka-go/protocol/reflect_unsafe.go @@ -63,6 +63,8 @@ func (v value) int32() int32 { return *(*int32)(v.ptr) } func (v value) int64() int64 { return *(*int64)(v.ptr) } +func (v value) float64() float64 { return *(*float64)(v.ptr) } + func (v value) string() string { return *(*string)(v.ptr) } func (v value) bytes() []byte { return *(*[]byte)(v.ptr) } @@ -92,6 +94,8 @@ func (v value) setInt32(i int32) { *(*int32)(v.ptr) = i } func (v value) setInt64(i int64) { *(*int64)(v.ptr) = i } +func (v value) setFloat64(f float64) { *(*float64)(v.ptr) = f } + func (v value) setString(s string) { *(*string)(v.ptr) = s } func (v value) setBytes(b []byte) { *(*[]byte)(v.ptr) = b } diff --git a/vendor/github.com/segmentio/kafka-go/reader.go b/vendor/github.com/segmentio/kafka-go/reader.go index 8e46c338f..1acb676e9 100644 --- a/vendor/github.com/segmentio/kafka-go/reader.go +++ b/vendor/github.com/segmentio/kafka-go/reader.go @@ -19,7 +19,7 @@ const ( ) const ( - // defaultCommitRetries holds the number commit attempts to make + // defaultCommitRetries holds the number of commit attempts to make // before giving up. defaultCommitRetries = 3 ) @@ -238,7 +238,7 @@ func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) { commit := func() { if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil { - r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) }) + r.withErrorLogger(func(l Logger) { l.Printf("%v", err) }) } else { offsets.reset() } @@ -311,7 +311,7 @@ func (r *Reader) run(cg *ConsumerGroup) { } r.stats.errors.observe(1) r.withErrorLogger(func(l Logger) { - l.Printf(err.Error()) + l.Printf("%v", err) }) // Continue with next attempt... } @@ -510,7 +510,7 @@ type ReaderConfig struct { // non-transactional and committed records are visible. IsolationLevel IsolationLevel - // Limit of how many attempts will be made before delivering the error. + // Limit of how many attempts to connect will be made before returning the error. // // The default is to try 3 times. MaxAttempts int @@ -785,7 +785,7 @@ func (r *Reader) Close() error { // offset when called. Note that this could result in an offset being committed // before the message is fully processed. // -// If more fine grained control of when offsets are committed is required, it +// If more fine-grained control of when offsets are committed is required, it // is recommended to use FetchMessage with CommitMessages instead. func (r *Reader) ReadMessage(ctx context.Context) (Message, error) { m, err := r.FetchMessage(ctx) @@ -1220,7 +1220,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { } // A reader reads messages from kafka and produces them on its channels, it's -// used as an way to asynchronously fetch messages while the main program reads +// used as a way to asynchronously fetch messages while the main program reads // them using the high level reader API. type reader struct { dialer *Dialer @@ -1346,7 +1346,7 @@ func (r *reader) run(ctx context.Context, offset int64) { case errors.Is(err, UnknownTopicOrPartition): r.withErrorLogger(func(log Logger) { - log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers) + log.Printf("failed to read from current broker %v for partition %d of %s at offset %d: %v", r.brokers, r.partition, r.topic, toHumanOffset(offset), err) }) conn.Close() @@ -1358,7 +1358,7 @@ func (r *reader) run(ctx context.Context, offset int64) { case errors.Is(err, NotLeaderForPartition): r.withErrorLogger(func(log Logger) { - log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, toHumanOffset(offset)) + log.Printf("failed to read from current broker for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err) }) conn.Close() @@ -1372,7 +1372,7 @@ func (r *reader) run(ctx context.Context, offset int64) { // Timeout on the kafka side, this can be safely retried. errcount = 0 r.withLogger(func(log Logger) { - log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset)) + log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err) }) r.stats.timeouts.observe(1) continue diff --git a/vendor/github.com/segmentio/kafka-go/sasl/scram/scram.go b/vendor/github.com/segmentio/kafka-go/sasl/scram/scram.go index bc2b28ed2..b29885f32 100644 --- a/vendor/github.com/segmentio/kafka-go/sasl/scram/scram.go +++ b/vendor/github.com/segmentio/kafka-go/sasl/scram/scram.go @@ -7,7 +7,7 @@ import ( "hash" "github.com/segmentio/kafka-go/sasl" - "github.com/xdg/scram" + "github.com/xdg-go/scram" ) // Algorithm determines the hash function used by SCRAM to protect the user's diff --git a/vendor/github.com/segmentio/kafka-go/stats.go b/vendor/github.com/segmentio/kafka-go/stats.go index aec9df5e2..ef1e582cb 100644 --- a/vendor/github.com/segmentio/kafka-go/stats.go +++ b/vendor/github.com/segmentio/kafka-go/stats.go @@ -6,19 +6,21 @@ import ( ) // SummaryStats is a data structure that carries a summary of observed values. -// The average, minimum, and maximum are reported. type SummaryStats struct { - Avg int64 `metric:"avg" type:"gauge"` - Min int64 `metric:"min" type:"gauge"` - Max int64 `metric:"max" type:"gauge"` + Avg int64 `metric:"avg" type:"gauge"` + Min int64 `metric:"min" type:"gauge"` + Max int64 `metric:"max" type:"gauge"` + Count int64 `metric:"count" type:"counter"` + Sum int64 `metric:"sum" type:"counter"` } -// DurationStats is a data structure that carries a summary of observed duration -// values. The average, minimum, and maximum are reported. +// DurationStats is a data structure that carries a summary of observed duration values. type DurationStats struct { - Avg time.Duration `metric:"avg" type:"gauge"` - Min time.Duration `metric:"min" type:"gauge"` - Max time.Duration `metric:"max" type:"gauge"` + Avg time.Duration `metric:"avg" type:"gauge"` + Min time.Duration `metric:"min" type:"gauge"` + Max time.Duration `metric:"max" type:"gauge"` + Count int64 `metric:"count" type:"counter"` + Sum time.Duration `metric:"sum" type:"counter"` } // counter is an atomic incrementing counter which gets reset on snapshot. @@ -167,17 +169,21 @@ func (s *summary) snapshot() SummaryStats { } return SummaryStats{ - Avg: avg, - Min: min, - Max: max, + Avg: avg, + Min: min, + Max: max, + Count: count, + Sum: sum, } } func (s *summary) snapshotDuration() DurationStats { summary := s.snapshot() return DurationStats{ - Avg: time.Duration(summary.Avg), - Min: time.Duration(summary.Min), - Max: time.Duration(summary.Max), + Avg: time.Duration(summary.Avg), + Min: time.Duration(summary.Min), + Max: time.Duration(summary.Max), + Count: summary.Count, + Sum: time.Duration(summary.Sum), } } diff --git a/vendor/github.com/segmentio/kafka-go/transport.go b/vendor/github.com/segmentio/kafka-go/transport.go index 6ba2d638c..685bdddb1 100644 --- a/vendor/github.com/segmentio/kafka-go/transport.go +++ b/vendor/github.com/segmentio/kafka-go/transport.go @@ -60,7 +60,7 @@ type Transport struct { // Time limit set for establishing connections to the kafka cluster. This // limit includes all round trips done to establish the connections (TLS - // hadbhaske, SASL negotiation, etc...). + // handshake, SASL negotiation, etc...). // // Defaults to 5s. DialTimeout time.Duration @@ -81,6 +81,10 @@ type Transport struct { // Default to 6s. MetadataTTL time.Duration + // Topic names for the metadata cached by this transport. If this field is left blank, + // metadata information of all topics in the cluster will be retrieved. + MetadataTopics []string + // Unique identifier that the transport communicates to the brokers when it // sends requests. ClientID string @@ -150,7 +154,7 @@ func (t *Transport) CloseIdleConnections() { // package. // // The type of the response message will match the type of the request. For -// exmple, if RoundTrip was called with a *fetch.Request as argument, the value +// example, if RoundTrip was called with a *fetch.Request as argument, the value // returned will be of type *fetch.Response. It is safe for the program to do a // type assertion after checking that no error was returned. // @@ -235,14 +239,15 @@ func (t *Transport) grabPool(addr net.Addr) *connPool { p = &connPool{ refc: 2, - dial: t.dial(), - dialTimeout: t.dialTimeout(), - idleTimeout: t.idleTimeout(), - metadataTTL: t.metadataTTL(), - clientID: t.ClientID, - tls: t.TLS, - sasl: t.SASL, - resolver: t.Resolver, + dial: t.dial(), + dialTimeout: t.dialTimeout(), + idleTimeout: t.idleTimeout(), + metadataTTL: t.metadataTTL(), + metadataTopics: t.MetadataTopics, + clientID: t.ClientID, + tls: t.TLS, + sasl: t.SASL, + resolver: t.Resolver, ready: make(event), wake: make(chan event), @@ -276,14 +281,15 @@ type connPool struct { // Immutable fields of the connection pool. Connections access these field // on their parent pool in a ready-only fashion, so no synchronization is // required. - dial func(context.Context, string, string) (net.Conn, error) - dialTimeout time.Duration - idleTimeout time.Duration - metadataTTL time.Duration - clientID string - tls *tls.Config - sasl sasl.Mechanism - resolver BrokerResolver + dial func(context.Context, string, string) (net.Conn, error) + dialTimeout time.Duration + idleTimeout time.Duration + metadataTTL time.Duration + metadataTopics []string + clientID string + tls *tls.Config + sasl sasl.Mechanism + resolver BrokerResolver // Signaling mechanisms to orchestrate communications between the pool and // the rest of the program. once sync.Once // ensure that `ready` is triggered only once @@ -413,14 +419,16 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) case *meta.Response: m := req.(*meta.Request) // If we get here with allow auto topic creation then - // we didn't have that topic in our cache so we should update + // we didn't have that topic in our cache, so we should update // the cache. if m.AllowAutoTopicCreation { topicsToRefresh := make([]string, 0, len(resp.Topics)) for _, topic := range resp.Topics { - // fixes issue 806: don't refresh topics that failed to create, - // it may means kafka doesn't enable auto topic creation. - // This causes the library to hang indefinitely, same as createtopics process. + // Don't refresh topics that failed to create, since that may + // mean that enable automatic topic creation is not enabled. + // That causes the library to hang indefinitely, same as + // don't refresh topics that failed to create, + // createtopics process. Fixes issue 806. if topic.ErrorCode != 0 { continue } @@ -590,13 +598,16 @@ func (p *connPool) discover(ctx context.Context, wake <-chan event) { var notify event done := ctx.Done() + req := &meta.Request{ + TopicNames: p.metadataTopics, + } + for { c, err := p.grabClusterConn(ctx) if err != nil { p.update(ctx, nil, err) } else { res := make(async, 1) - req := &meta.Request{} deadline, cancel := context.WithTimeout(ctx, p.metadataTTL) c.reqs <- connRequest{ ctx: deadline, diff --git a/vendor/github.com/segmentio/kafka-go/writer.go b/vendor/github.com/segmentio/kafka-go/writer.go index dc9c77f78..c37d8e041 100644 --- a/vendor/github.com/segmentio/kafka-go/writer.go +++ b/vendor/github.com/segmentio/kafka-go/writer.go @@ -27,29 +27,29 @@ import ( // by the function and test if it an instance of kafka.WriteErrors in order to // identify which messages have succeeded or failed, for example: // -// // Construct a synchronous writer (the default mode). -// w := &kafka.Writer{ -// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), -// Topic: "topic-A", -// RequiredAcks: kafka.RequireAll, -// } +// // Construct a synchronous writer (the default mode). +// w := &kafka.Writer{ +// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), +// Topic: "topic-A", +// RequiredAcks: kafka.RequireAll, +// } // -// ... +// ... // -// // Passing a context can prevent the operation from blocking indefinitely. -// switch err := w.WriteMessages(ctx, msgs...).(type) { -// case nil: -// case kafka.WriteErrors: -// for i := range msgs { -// if err[i] != nil { -// // handle the error writing msgs[i] -// ... +// // Passing a context can prevent the operation from blocking indefinitely. +// switch err := w.WriteMessages(ctx, msgs...).(type) { +// case nil: +// case kafka.WriteErrors: +// for i := range msgs { +// if err[i] != nil { +// // handle the error writing msgs[i] +// ... +// } // } +// default: +// // handle other errors +// ... // } -// default: -// // handle other errors -// ... -// } // // In asynchronous mode, the program may configure a completion handler on the // writer to receive notifications of messages being written to kafka: @@ -307,10 +307,6 @@ type WriterConfig struct { // a response to a produce request. The default is -1, which means to wait for // all replicas, and a value above 0 is required to indicate how many replicas // should acknowledge a message to be considered successful. - // - // This version of kafka-go (v0.3) does not support 0 required acks, due to - // some internal complexity implementing this with the Kafka protocol. If you - // need that functionality specifically, you'll need to upgrade to v0.4. RequiredAcks int // Setting this flag to true causes the WriteMessages method to never block. @@ -352,12 +348,13 @@ type WriterStats struct { Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"` Errors int64 `metric:"kafka.writer.error.count" type:"counter"` - BatchTime DurationStats `metric:"kafka.writer.batch.seconds"` - WriteTime DurationStats `metric:"kafka.writer.write.seconds"` - WaitTime DurationStats `metric:"kafka.writer.wait.seconds"` - Retries int64 `metric:"kafka.writer.retries.count" type:"counter"` - BatchSize SummaryStats `metric:"kafka.writer.batch.size"` - BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"` + BatchTime DurationStats `metric:"kafka.writer.batch.seconds"` + BatchQueueTime DurationStats `metric:"kafka.writer.batch.queue.seconds"` + WriteTime DurationStats `metric:"kafka.writer.write.seconds"` + WaitTime DurationStats `metric:"kafka.writer.wait.seconds"` + Retries int64 `metric:"kafka.writer.retries.count" type:"counter"` + BatchSize SummaryStats `metric:"kafka.writer.batch.size"` + BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"` MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"` WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"` @@ -402,6 +399,7 @@ type writerStats struct { errors counter dialTime summary batchTime summary + batchQueueTime summary writeTime summary waitTime summary retries counter @@ -537,7 +535,7 @@ func (w *Writer) enter() bool { // completed. func (w *Writer) leave() { w.group.Done() } -// spawn starts an new asynchronous operation on the writer. This method is used +// spawn starts a new asynchronous operation on the writer. This method is used // instead of starting goroutines inline to help manage the state of the // writer's wait group. The wait group is used to block Close calls until all // inflight operations have completed, therefore automatically including those @@ -602,9 +600,12 @@ func (w *Writer) Close() error { // // The context passed as first argument may also be used to asynchronously // cancel the operation. Note that in this case there are no guarantees made on -// whether messages were written to kafka. The program should assume that the -// whole batch failed and re-write the messages later (which could then cause -// duplicates). +// whether messages were written to kafka, they might also still be written +// after this method has already returned, therefore it is important to not +// modify byte slices of passed messages if WriteMessages returned early due +// to a canceled context. +// The program should assume that the whole batch failed and re-write the +// messages later (which could then cause duplicates). func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { if w.Addr == nil { return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address") @@ -623,7 +624,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { batchBytes := w.batchBytes() for i := range msgs { - n := int64(msgs[i].size()) + n := int64(msgs[i].totalSize()) if n > batchBytes { // This error is left for backward compatibility with historical // behavior, but it can yield O(N^2) behaviors. The expectations @@ -884,6 +885,7 @@ func (w *Writer) Stats() WriterStats { Errors: stats.errors.snapshot(), DialTime: stats.dialTime.snapshotDuration(), BatchTime: stats.batchTime.snapshotDuration(), + BatchQueueTime: stats.batchQueueTime.snapshotDuration(), WriteTime: stats.writeTime.snapshotDuration(), WaitTime: stats.waitTime.snapshotDuration(), Retries: stats.retries.snapshot(), @@ -1092,6 +1094,8 @@ func (ptw *partitionWriter) awaitBatch(batch *writeBatch) { // having it leak until it expires. batch.timer.Stop() } + stats := ptw.w.stats() + stats.batchQueueTime.observe(int64(time.Since(batch.time))) } func (ptw *partitionWriter) writeBatch(batch *writeBatch) { @@ -1152,7 +1156,7 @@ func (ptw *partitionWriter) writeBatch(batch *writeBatch) { stats.errors.observe(1) ptw.w.withErrorLogger(func(log Logger) { - log.Printf("error writing messages to %s (partition %d): %s", key.topic, key.partition, err) + log.Printf("error writing messages to %s (partition %d, attempt %d): %s", key.topic, key.partition, attempt, err) }) if !isTemporary(err) && !isTransientNetworkError(err) { @@ -1215,7 +1219,7 @@ func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch { } func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool { - bytes := int64(msg.size()) + bytes := int64(msg.totalSize()) if b.size > 0 && (b.bytes+bytes) > maxBytes { return false diff --git a/vendor/github.com/xdg-go/pbkdf2/.gitignore b/vendor/github.com/xdg-go/pbkdf2/.gitignore new file mode 100644 index 000000000..f1c181ec9 --- /dev/null +++ b/vendor/github.com/xdg-go/pbkdf2/.gitignore @@ -0,0 +1,12 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out diff --git a/vendor/github.com/xdg/scram/LICENSE b/vendor/github.com/xdg-go/pbkdf2/LICENSE similarity index 100% rename from vendor/github.com/xdg/scram/LICENSE rename to vendor/github.com/xdg-go/pbkdf2/LICENSE diff --git a/vendor/github.com/xdg-go/pbkdf2/README.md b/vendor/github.com/xdg-go/pbkdf2/README.md new file mode 100644 index 000000000..d2824e456 --- /dev/null +++ b/vendor/github.com/xdg-go/pbkdf2/README.md @@ -0,0 +1,17 @@ +[![Go Reference](https://pkg.go.dev/badge/github.com/xdg-go/pbkdf2.svg)](https://pkg.go.dev/github.com/xdg-go/pbkdf2) +[![Go Report Card](https://goreportcard.com/badge/github.com/xdg-go/pbkdf2)](https://goreportcard.com/report/github.com/xdg-go/pbkdf2) +[![Github Actions](https://github.com/xdg-go/pbkdf2/actions/workflows/test.yml/badge.svg)](https://github.com/xdg-go/pbkdf2/actions/workflows/test.yml) + +# pbkdf2 – Go implementation of PBKDF2 + +## Description + +Package pbkdf2 provides password-based key derivation based on +[RFC 8018](https://tools.ietf.org/html/rfc8018). + +## Copyright and License + +Copyright 2021 by David A. Golden. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). You may +obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 diff --git a/vendor/github.com/xdg-go/pbkdf2/pbkdf2.go b/vendor/github.com/xdg-go/pbkdf2/pbkdf2.go new file mode 100644 index 000000000..029945ca4 --- /dev/null +++ b/vendor/github.com/xdg-go/pbkdf2/pbkdf2.go @@ -0,0 +1,76 @@ +// Copyright 2021 by David A. Golden. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + +// Package pbkdf2 implements password-based key derivation using the PBKDF2 +// algorithm described in RFC 2898 and RFC 8018. +// +// It provides a drop-in replacement for `golang.org/x/crypto/pbkdf2`, with +// the following benefits: +// +// - Released as a module with semantic versioning +// +// - Does not pull in dependencies for unrelated `x/crypto/*` packages +// +// - Supports Go 1.9+ +// +// See https://tools.ietf.org/html/rfc8018#section-4 for security considerations +// in the selection of a salt and iteration count. +package pbkdf2 + +import ( + "crypto/hmac" + "encoding/binary" + "hash" +) + +// Key generates a derived key from a password using the PBKDF2 algorithm. The +// inputs include salt bytes, the iteration count, desired key length, and a +// constructor for a hashing function. For example, for a 32-byte key using +// SHA-256: +// +// key := Key([]byte("trustNo1"), salt, 10000, 32, sha256.New) +func Key(password, salt []byte, iterCount, keyLen int, h func() hash.Hash) []byte { + prf := hmac.New(h, password) + hLen := prf.Size() + numBlocks := keyLen / hLen + // Get an extra block if keyLen is not an even number of hLen blocks. + if keyLen%hLen > 0 { + numBlocks++ + } + + Ti := make([]byte, hLen) + Uj := make([]byte, hLen) + dk := make([]byte, 0, hLen*numBlocks) + buf := make([]byte, 4) + + for i := uint32(1); i <= uint32(numBlocks); i++ { + // Initialize Uj for j == 1 from salt and block index. + // Initialize Ti = U1. + binary.BigEndian.PutUint32(buf, i) + prf.Reset() + prf.Write(salt) + prf.Write(buf) + Uj = Uj[:0] + Uj = prf.Sum(Uj) + + // Ti = U1 ^ U2 ^ ... ^ Ux + copy(Ti, Uj) + for j := 2; j <= iterCount; j++ { + prf.Reset() + prf.Write(Uj) + Uj = Uj[:0] + Uj = prf.Sum(Uj) + for k := range Uj { + Ti[k] ^= Uj[k] + } + } + + // DK = concat(T1, T2, ... Tn) + dk = append(dk, Ti...) + } + + return dk[0:keyLen] +} diff --git a/vendor/github.com/xdg/scram/.gitignore b/vendor/github.com/xdg-go/scram/.gitignore similarity index 100% rename from vendor/github.com/xdg/scram/.gitignore rename to vendor/github.com/xdg-go/scram/.gitignore diff --git a/vendor/github.com/xdg-go/scram/CHANGELOG.md b/vendor/github.com/xdg-go/scram/CHANGELOG.md new file mode 100644 index 000000000..b833be5e2 --- /dev/null +++ b/vendor/github.com/xdg-go/scram/CHANGELOG.md @@ -0,0 +1,26 @@ +# CHANGELOG + +## v1.1.2 - 2022-12-07 + +- Bump stringprep dependency to v1.0.4 for upstream CVE fix. + +## v1.1.1 - 2022-03-03 + +- Bump stringprep dependency to v1.0.3 for upstream CVE fix. + +## v1.1.0 - 2022-01-16 + +- Add SHA-512 hash generator function for convenience. + +## v1.0.2 - 2021-03-28 + +- Switch PBKDF2 dependency to github.com/xdg-go/pbkdf2 to + minimize transitive dependencies and support Go 1.9+. + +## v1.0.1 - 2021-03-27 + +- Bump stringprep dependency to v1.0.2 for Go 1.11 support. + +## v1.0.0 - 2021-03-27 + +- First release as a Go module diff --git a/vendor/github.com/xdg/stringprep/LICENSE b/vendor/github.com/xdg-go/scram/LICENSE similarity index 100% rename from vendor/github.com/xdg/stringprep/LICENSE rename to vendor/github.com/xdg-go/scram/LICENSE diff --git a/vendor/github.com/xdg-go/scram/README.md b/vendor/github.com/xdg-go/scram/README.md new file mode 100644 index 000000000..3a46f5ceb --- /dev/null +++ b/vendor/github.com/xdg-go/scram/README.md @@ -0,0 +1,72 @@ +[![Go Reference](https://pkg.go.dev/badge/github.com/xdg-go/scram.svg)](https://pkg.go.dev/github.com/xdg-go/scram) +[![Go Report Card](https://goreportcard.com/badge/github.com/xdg-go/scram)](https://goreportcard.com/report/github.com/xdg-go/scram) +[![Github Actions](https://github.com/xdg-go/scram/actions/workflows/test.yml/badge.svg)](https://github.com/xdg-go/scram/actions/workflows/test.yml) + +# scram – Go implementation of RFC-5802 + +## Description + +Package scram provides client and server implementations of the Salted +Challenge Response Authentication Mechanism (SCRAM) described in +[RFC-5802](https://tools.ietf.org/html/rfc5802) and +[RFC-7677](https://tools.ietf.org/html/rfc7677). + +It includes both client and server side support. + +Channel binding and extensions are not (yet) supported. + +## Examples + +### Client side + + package main + + import "github.com/xdg-go/scram" + + func main() { + // Get Client with username, password and (optional) authorization ID. + clientSHA1, err := scram.SHA1.NewClient("mulder", "trustno1", "") + if err != nil { + panic(err) + } + + // Prepare the authentication conversation. Use the empty string as the + // initial server message argument to start the conversation. + conv := clientSHA1.NewConversation() + var serverMsg string + + // Get the first message, send it and read the response. + firstMsg, err := conv.Step(serverMsg) + if err != nil { + panic(err) + } + serverMsg = sendClientMsg(firstMsg) + + // Get the second message, send it, and read the response. + secondMsg, err := conv.Step(serverMsg) + if err != nil { + panic(err) + } + serverMsg = sendClientMsg(secondMsg) + + // Validate the server's final message. We have no further message to + // send so ignore that return value. + _, err = conv.Step(serverMsg) + if err != nil { + panic(err) + } + + return + } + + func sendClientMsg(s string) string { + // A real implementation would send this to a server and read a reply. + return "" + } + +## Copyright and License + +Copyright 2018 by David A. Golden. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). You may +obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 diff --git a/vendor/github.com/xdg/scram/client.go b/vendor/github.com/xdg-go/scram/client.go similarity index 99% rename from vendor/github.com/xdg/scram/client.go rename to vendor/github.com/xdg-go/scram/client.go index ca0c4c711..5b53021b3 100644 --- a/vendor/github.com/xdg/scram/client.go +++ b/vendor/github.com/xdg-go/scram/client.go @@ -9,7 +9,7 @@ package scram import ( "sync" - "golang.org/x/crypto/pbkdf2" + "github.com/xdg-go/pbkdf2" ) // Client implements the client side of SCRAM authentication. It holds diff --git a/vendor/github.com/xdg/scram/client_conv.go b/vendor/github.com/xdg-go/scram/client_conv.go similarity index 100% rename from vendor/github.com/xdg/scram/client_conv.go rename to vendor/github.com/xdg-go/scram/client_conv.go diff --git a/vendor/github.com/xdg/scram/common.go b/vendor/github.com/xdg-go/scram/common.go similarity index 100% rename from vendor/github.com/xdg/scram/common.go rename to vendor/github.com/xdg-go/scram/common.go diff --git a/vendor/github.com/xdg/scram/doc.go b/vendor/github.com/xdg-go/scram/doc.go similarity index 62% rename from vendor/github.com/xdg/scram/doc.go rename to vendor/github.com/xdg-go/scram/doc.go index 0e060c180..82e8aeed8 100644 --- a/vendor/github.com/xdg/scram/doc.go +++ b/vendor/github.com/xdg-go/scram/doc.go @@ -4,18 +4,22 @@ // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -// Package scram is deprecated in favor of xdg-go/scram. +// Package scram provides client and server implementations of the Salted +// Challenge Response Authentication Mechanism (SCRAM) described in RFC-5802 +// and RFC-7677. // // Usage // -// The scram package provides two variables, `SHA1` and `SHA256`, that are -// used to construct Client or Server objects. +// The scram package provides variables, `SHA1`, `SHA256`, and `SHA512`, that +// are used to construct Client or Server objects. // // clientSHA1, err := scram.SHA1.NewClient(username, password, authID) // clientSHA256, err := scram.SHA256.NewClient(username, password, authID) +// clientSHA512, err := scram.SHA512.NewClient(username, password, authID) // // serverSHA1, err := scram.SHA1.NewServer(credentialLookupFcn) // serverSHA256, err := scram.SHA256.NewServer(credentialLookupFcn) +// serverSHA512, err := scram.SHA512.NewServer(credentialLookupFcn) // // These objects are used to construct ClientConversation or // ServerConversation objects that are used to carry out authentication. diff --git a/vendor/github.com/xdg/scram/parse.go b/vendor/github.com/xdg-go/scram/parse.go similarity index 100% rename from vendor/github.com/xdg/scram/parse.go rename to vendor/github.com/xdg-go/scram/parse.go diff --git a/vendor/github.com/xdg/scram/scram.go b/vendor/github.com/xdg-go/scram/scram.go similarity index 91% rename from vendor/github.com/xdg/scram/scram.go rename to vendor/github.com/xdg-go/scram/scram.go index 9e9836afe..a7b366027 100644 --- a/vendor/github.com/xdg/scram/scram.go +++ b/vendor/github.com/xdg-go/scram/scram.go @@ -9,10 +9,11 @@ package scram import ( "crypto/sha1" "crypto/sha256" + "crypto/sha512" "fmt" "hash" - "github.com/xdg/stringprep" + "github.com/xdg-go/stringprep" ) // HashGeneratorFcn abstracts a factory function that returns a hash.Hash @@ -29,6 +30,10 @@ var SHA1 HashGeneratorFcn = func() hash.Hash { return sha1.New() } // to create Client objects configured for SHA-256 hashing. var SHA256 HashGeneratorFcn = func() hash.Hash { return sha256.New() } +// SHA512 is a function that returns a crypto/sha512 hasher and should be used +// to create Client objects configured for SHA-512 hashing. +var SHA512 HashGeneratorFcn = func() hash.Hash { return sha512.New() } + // NewClient constructs a SCRAM client component based on a given hash.Hash // factory receiver. This constructor will normalize the username, password // and authzID via the SASLprep algorithm, as recommended by RFC-5802. If diff --git a/vendor/github.com/xdg/scram/server.go b/vendor/github.com/xdg-go/scram/server.go similarity index 100% rename from vendor/github.com/xdg/scram/server.go rename to vendor/github.com/xdg-go/scram/server.go diff --git a/vendor/github.com/xdg/scram/server_conv.go b/vendor/github.com/xdg-go/scram/server_conv.go similarity index 100% rename from vendor/github.com/xdg/scram/server_conv.go rename to vendor/github.com/xdg-go/scram/server_conv.go diff --git a/vendor/github.com/xdg/stringprep/.gitignore b/vendor/github.com/xdg-go/stringprep/.gitignore similarity index 100% rename from vendor/github.com/xdg/stringprep/.gitignore rename to vendor/github.com/xdg-go/stringprep/.gitignore diff --git a/vendor/github.com/xdg-go/stringprep/CHANGELOG.md b/vendor/github.com/xdg-go/stringprep/CHANGELOG.md new file mode 100644 index 000000000..04b9753cd --- /dev/null +++ b/vendor/github.com/xdg-go/stringprep/CHANGELOG.md @@ -0,0 +1,36 @@ +# CHANGELOG + + +## [v1.0.4] - 2022-12-07 + +### Maintenance + +- Bump golang.org/x/text to v0.3.8 due to CVE-2022-32149 + + +## [v1.0.3] - 2022-03-01 + +### Maintenance + +- Bump golang.org/x/text to v0.3.7 due to CVE-2021-38561 + + +## [v1.0.2] - 2021-03-27 + +### Maintenance + +- Change minimum Go version to 1.11 + + +## [v1.0.1] - 2021-03-24 + +### Bug Fixes + +- Add go.mod file + + +## [v1.0.0] - 2018-02-21 + +[v1.0.2]: https://github.com/xdg-go/stringprep/releases/tag/v1.0.2 +[v1.0.1]: https://github.com/xdg-go/stringprep/releases/tag/v1.0.1 +[v1.0.0]: https://github.com/xdg-go/stringprep/releases/tag/v1.0.0 diff --git a/vendor/github.com/xdg-go/stringprep/LICENSE b/vendor/github.com/xdg-go/stringprep/LICENSE new file mode 100644 index 000000000..67db85882 --- /dev/null +++ b/vendor/github.com/xdg-go/stringprep/LICENSE @@ -0,0 +1,175 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. diff --git a/vendor/github.com/xdg-go/stringprep/README.md b/vendor/github.com/xdg-go/stringprep/README.md new file mode 100644 index 000000000..83ea5346d --- /dev/null +++ b/vendor/github.com/xdg-go/stringprep/README.md @@ -0,0 +1,28 @@ +[![Go Reference](https://pkg.go.dev/badge/github.com/xdg-go/stringprep.svg)](https://pkg.go.dev/github.com/xdg-go/stringprep) +[![Go Report Card](https://goreportcard.com/badge/github.com/xdg-go/stringprep)](https://goreportcard.com/report/github.com/xdg-go/stringprep) +[![Github Actions](https://github.com/xdg-go/stringprep/actions/workflows/test.yml/badge.svg)](https://github.com/xdg-go/stringprep/actions/workflows/test.yml) + +# stringprep – Go implementation of RFC-3454 stringprep and RFC-4013 SASLprep + +## Synopsis + +``` + import "github.com/xdg-go/stringprep" + + prepped := stringprep.SASLprep.Prepare("TrustNô1") + +``` + +## Description + +This library provides an implementation of the stringprep algorithm +(RFC-3454) in Go, including all data tables. + +A pre-built SASLprep (RFC-4013) profile is provided as well. + +## Copyright and License + +Copyright 2018 by David A. Golden. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"). You may +obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 diff --git a/vendor/github.com/xdg/stringprep/bidi.go b/vendor/github.com/xdg-go/stringprep/bidi.go similarity index 100% rename from vendor/github.com/xdg/stringprep/bidi.go rename to vendor/github.com/xdg-go/stringprep/bidi.go diff --git a/vendor/github.com/xdg/stringprep/doc.go b/vendor/github.com/xdg-go/stringprep/doc.go similarity index 100% rename from vendor/github.com/xdg/stringprep/doc.go rename to vendor/github.com/xdg-go/stringprep/doc.go diff --git a/vendor/github.com/xdg/stringprep/error.go b/vendor/github.com/xdg-go/stringprep/error.go similarity index 100% rename from vendor/github.com/xdg/stringprep/error.go rename to vendor/github.com/xdg-go/stringprep/error.go diff --git a/vendor/github.com/xdg/stringprep/map.go b/vendor/github.com/xdg-go/stringprep/map.go similarity index 100% rename from vendor/github.com/xdg/stringprep/map.go rename to vendor/github.com/xdg-go/stringprep/map.go diff --git a/vendor/github.com/xdg/stringprep/profile.go b/vendor/github.com/xdg-go/stringprep/profile.go similarity index 100% rename from vendor/github.com/xdg/stringprep/profile.go rename to vendor/github.com/xdg-go/stringprep/profile.go diff --git a/vendor/github.com/xdg/stringprep/saslprep.go b/vendor/github.com/xdg-go/stringprep/saslprep.go similarity index 100% rename from vendor/github.com/xdg/stringprep/saslprep.go rename to vendor/github.com/xdg-go/stringprep/saslprep.go diff --git a/vendor/github.com/xdg/stringprep/set.go b/vendor/github.com/xdg-go/stringprep/set.go similarity index 100% rename from vendor/github.com/xdg/stringprep/set.go rename to vendor/github.com/xdg-go/stringprep/set.go diff --git a/vendor/github.com/xdg/stringprep/tables.go b/vendor/github.com/xdg-go/stringprep/tables.go similarity index 100% rename from vendor/github.com/xdg/stringprep/tables.go rename to vendor/github.com/xdg-go/stringprep/tables.go diff --git a/vendor/github.com/xdg/scram/.travis.yml b/vendor/github.com/xdg/scram/.travis.yml deleted file mode 100644 index f391327ea..000000000 --- a/vendor/github.com/xdg/scram/.travis.yml +++ /dev/null @@ -1,11 +0,0 @@ -language: go -sudo: false -go: - - "1.7" - - "1.8" - - "1.9" - - "1.10" - - master -matrix: - allow_failures: - - go: master diff --git a/vendor/github.com/xdg/scram/README.md b/vendor/github.com/xdg/scram/README.md deleted file mode 100644 index 311a2ec32..000000000 --- a/vendor/github.com/xdg/scram/README.md +++ /dev/null @@ -1,5 +0,0 @@ -**DON'T USE THIS PACKAGE** - use [`xdg-go/scram`](https://pkg.go.dev/github.com/xdg-go/scram) instead! - -I renamed this to [`xdg-go/scram`](https://pkg.go.dev/github.com/xdg-go/scram) in October 2018. This didn't break dependencies at the time because Github redirected requests. In March 2021, I made `xdg-go/scram` a module, which can't be used as `xdg/scram` with Github redirects. This repository has been recreated to support legacy dependencies. - -See my article [How I broke the MongoDB Go driver ecosystem](https://xdg.me/i-broke-the-mongodb-go-driver-ecosystem/) for more details. diff --git a/vendor/github.com/xdg/stringprep/.travis.yml b/vendor/github.com/xdg/stringprep/.travis.yml deleted file mode 100644 index f391327ea..000000000 --- a/vendor/github.com/xdg/stringprep/.travis.yml +++ /dev/null @@ -1,11 +0,0 @@ -language: go -sudo: false -go: - - "1.7" - - "1.8" - - "1.9" - - "1.10" - - master -matrix: - allow_failures: - - go: master diff --git a/vendor/github.com/xdg/stringprep/README.md b/vendor/github.com/xdg/stringprep/README.md deleted file mode 100644 index 2a8722172..000000000 --- a/vendor/github.com/xdg/stringprep/README.md +++ /dev/null @@ -1 +0,0 @@ -This was renamed to [`xdg-go/stringprep`](https://github.com/xdg/stringprep) in October 2018. This didn't break dependencies at the time because Github redirected requests. In March 2021, I made `xdg-go/stringprep` a module, which can't be used as `xdg/stringprep` with Github redirects. This repository has been recreated to support legacy dependencies. diff --git a/vendor/golang.org/x/crypto/pbkdf2/pbkdf2.go b/vendor/golang.org/x/crypto/pbkdf2/pbkdf2.go deleted file mode 100644 index 904b57e01..000000000 --- a/vendor/golang.org/x/crypto/pbkdf2/pbkdf2.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2012 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -/* -Package pbkdf2 implements the key derivation function PBKDF2 as defined in RFC -2898 / PKCS #5 v2.0. - -A key derivation function is useful when encrypting data based on a password -or any other not-fully-random data. It uses a pseudorandom function to derive -a secure encryption key based on the password. - -While v2.0 of the standard defines only one pseudorandom function to use, -HMAC-SHA1, the drafted v2.1 specification allows use of all five FIPS Approved -Hash Functions SHA-1, SHA-224, SHA-256, SHA-384 and SHA-512 for HMAC. To -choose, you can pass the `New` functions from the different SHA packages to -pbkdf2.Key. -*/ -package pbkdf2 // import "golang.org/x/crypto/pbkdf2" - -import ( - "crypto/hmac" - "hash" -) - -// Key derives a key from the password, salt and iteration count, returning a -// []byte of length keylen that can be used as cryptographic key. The key is -// derived based on the method described as PBKDF2 with the HMAC variant using -// the supplied hash function. -// -// For example, to use a HMAC-SHA-1 based PBKDF2 key derivation function, you -// can get a derived key for e.g. AES-256 (which needs a 32-byte key) by -// doing: -// -// dk := pbkdf2.Key([]byte("some password"), salt, 4096, 32, sha1.New) -// -// Remember to get a good random salt. At least 8 bytes is recommended by the -// RFC. -// -// Using a higher iteration count will increase the cost of an exhaustive -// search but will also make derivation proportionally slower. -func Key(password, salt []byte, iter, keyLen int, h func() hash.Hash) []byte { - prf := hmac.New(h, password) - hashLen := prf.Size() - numBlocks := (keyLen + hashLen - 1) / hashLen - - var buf [4]byte - dk := make([]byte, 0, numBlocks*hashLen) - U := make([]byte, hashLen) - for block := 1; block <= numBlocks; block++ { - // N.B.: || means concatenation, ^ means XOR - // for each block T_i = U_1 ^ U_2 ^ ... ^ U_iter - // U_1 = PRF(password, salt || uint(i)) - prf.Reset() - prf.Write(salt) - buf[0] = byte(block >> 24) - buf[1] = byte(block >> 16) - buf[2] = byte(block >> 8) - buf[3] = byte(block) - prf.Write(buf[:4]) - dk = prf.Sum(dk) - T := dk[len(dk)-hashLen:] - copy(U, T) - - // U_n = PRF(password, U_(n-1)) - for n := 2; n <= iter; n++ { - prf.Reset() - prf.Write(U) - U = U[:0] - U = prf.Sum(U) - for x := range U { - T[x] ^= U[x] - } - } - } - return dk[:keyLen] -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 962191f06..2e9d664ef 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -328,7 +328,7 @@ github.com/prometheus/prometheus/util/strutil # github.com/rs/xid v1.5.0 ## explicit; go 1.12 github.com/rs/xid -# github.com/segmentio/kafka-go v0.4.38 +# github.com/segmentio/kafka-go v0.4.43 ## explicit; go 1.15 github.com/segmentio/kafka-go github.com/segmentio/kafka-go/compress @@ -339,16 +339,23 @@ github.com/segmentio/kafka-go/compress/zstd github.com/segmentio/kafka-go/protocol github.com/segmentio/kafka-go/protocol/addoffsetstotxn github.com/segmentio/kafka-go/protocol/addpartitionstotxn +github.com/segmentio/kafka-go/protocol/alterclientquotas github.com/segmentio/kafka-go/protocol/alterconfigs github.com/segmentio/kafka-go/protocol/alterpartitionreassignments +github.com/segmentio/kafka-go/protocol/alteruserscramcredentials github.com/segmentio/kafka-go/protocol/apiversions github.com/segmentio/kafka-go/protocol/consumer github.com/segmentio/kafka-go/protocol/createacls github.com/segmentio/kafka-go/protocol/createpartitions github.com/segmentio/kafka-go/protocol/createtopics +github.com/segmentio/kafka-go/protocol/deleteacls +github.com/segmentio/kafka-go/protocol/deletegroups github.com/segmentio/kafka-go/protocol/deletetopics +github.com/segmentio/kafka-go/protocol/describeacls +github.com/segmentio/kafka-go/protocol/describeclientquotas github.com/segmentio/kafka-go/protocol/describeconfigs github.com/segmentio/kafka-go/protocol/describegroups +github.com/segmentio/kafka-go/protocol/describeuserscramcredentials github.com/segmentio/kafka-go/protocol/electleaders github.com/segmentio/kafka-go/protocol/endtxn github.com/segmentio/kafka-go/protocol/fetch @@ -429,12 +436,15 @@ github.com/vmware/go-ipfix/pkg/entities github.com/vmware/go-ipfix/pkg/exporter github.com/vmware/go-ipfix/pkg/registry github.com/vmware/go-ipfix/pkg/util -# github.com/xdg/scram v1.0.5 -## explicit -github.com/xdg/scram -# github.com/xdg/stringprep v1.0.3 -## explicit -github.com/xdg/stringprep +# github.com/xdg-go/pbkdf2 v1.0.0 +## explicit; go 1.9 +github.com/xdg-go/pbkdf2 +# github.com/xdg-go/scram v1.1.2 +## explicit; go 1.11 +github.com/xdg-go/scram +# github.com/xdg-go/stringprep v1.0.4 +## explicit; go 1.11 +github.com/xdg-go/stringprep # go.uber.org/atomic v1.9.0 ## explicit; go 1.13 go.uber.org/atomic @@ -446,7 +456,6 @@ golang.org/x/crypto/cryptobyte golang.org/x/crypto/cryptobyte/asn1 golang.org/x/crypto/curve25519 golang.org/x/crypto/curve25519/internal/field -golang.org/x/crypto/pbkdf2 # golang.org/x/net v0.14.0 ## explicit; go 1.17 golang.org/x/net/context