Skip to content

Commit 93bcdde

Browse files
authored
KIP-430: Return Authorized Operations in Describe Responses (#2656)
1 parent 8fb3f96 commit 93bcdde

File tree

10 files changed

+213
-105
lines changed

10 files changed

+213
-105
lines changed

kafka/admin/acl_resource.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from __future__ import absolute_import
2-
from kafka.errors import IllegalArgumentError
32

43
# enum in stdlib as of py3.4
54
try:
@@ -8,6 +7,8 @@
87
# vendored backport module
98
from kafka.vendor.enum34 import IntEnum
109

10+
from kafka.errors import IllegalArgumentError
11+
1112

1213
class ResourceType(IntEnum):
1314
"""Type of kafka resource to set ACL for
@@ -30,6 +31,7 @@ class ACLOperation(IntEnum):
3031
The ANY value is only valid in a filter context
3132
"""
3233

34+
UNKNOWN = 0,
3335
ANY = 1,
3436
ALL = 2,
3537
READ = 3,
@@ -41,7 +43,9 @@ class ACLOperation(IntEnum):
4143
CLUSTER_ACTION = 9,
4244
DESCRIBE_CONFIGS = 10,
4345
ALTER_CONFIGS = 11,
44-
IDEMPOTENT_WRITE = 12
46+
IDEMPOTENT_WRITE = 12,
47+
CREATE_TOKENS = 13,
48+
DESCRIBE_TOKENS = 13
4549

4650

4751
class ACLPermissionType(IntEnum):
@@ -50,6 +54,7 @@ class ACLPermissionType(IntEnum):
5054
The ANY value is only valid in a filter context
5155
"""
5256

57+
UNKNOWN = 0,
5358
ANY = 1,
5459
DENY = 2,
5560
ALLOW = 3
@@ -63,6 +68,7 @@ class ACLResourcePatternType(IntEnum):
6368
https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs
6469
"""
6570

71+
UNKNOWN = 0,
6672
ANY = 1,
6773
MATCH = 2,
6874
LITERAL = 3,
@@ -242,3 +248,7 @@ def validate(self):
242248
raise IllegalArgumentError(
243249
"pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
244250
)
251+
252+
253+
def valid_acl_operations(int_vals):
254+
return set([ACLOperation(v) for v in int_vals if v not in (0, 1, 2)])

kafka/admin/client.py

Lines changed: 98 additions & 91 deletions
Large diffs are not rendered by default.

kafka/client_async.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -978,15 +978,17 @@ def _maybe_refresh_metadata(self, wakeup=False):
978978
if not topics and self.cluster.is_bootstrap(node_id):
979979
topics = list(self.config['bootstrap_topics_filter'])
980980

981-
api_version = self.api_version(MetadataRequest, max_version=7)
981+
api_version = self.api_version(MetadataRequest, max_version=8)
982982
if self.cluster.need_all_topic_metadata:
983983
topics = MetadataRequest[api_version].ALL_TOPICS
984984
elif not topics:
985985
topics = MetadataRequest[api_version].NO_TOPICS
986-
if api_version >= 4:
986+
if api_version <= 3:
987+
request = MetadataRequest[api_version](topics)
988+
elif api_version <= 7:
987989
request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics'])
988990
else:
989-
request = MetadataRequest[api_version](topics)
991+
request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics'], False, False)
990992
log.debug("Sending metadata request %s to node %s", request, node_id)
991993
future = self.send(node_id, request, wakeup=wakeup)
992994
future.add_callback(self.cluster.update_metadata)

kafka/cluster.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,10 @@ def update_metadata(self, metadata):
279279
if metadata.API_VERSION == 0:
280280
error_code, topic, partitions = topic_data
281281
is_internal = False
282-
else:
282+
elif metadata.API_VERSION <= 7:
283283
error_code, topic, is_internal, partitions = topic_data
284+
else:
285+
error_code, topic, is_internal, partitions, _authorized_operations = topic_data
284286
if is_internal:
285287
_new_internal_topics.add(topic)
286288
error_type = Errors.for_code(error_code)

kafka/protocol/admin.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from kafka.vendor.enum34 import IntEnum
99

1010
from kafka.protocol.api import Request, Response
11-
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields
11+
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields, BitField
1212

1313

1414
class CreateTopicsResponse_v0(Response):
@@ -337,8 +337,8 @@ class DescribeGroupsResponse_v3(Response):
337337
('client_id', String('utf-8')),
338338
('client_host', String('utf-8')),
339339
('member_metadata', Bytes),
340-
('member_assignment', Bytes)))),
341-
('authorized_operations', Int32))
340+
('member_assignment', Bytes))),
341+
('authorized_operations', BitField)))
342342
)
343343

344344

@@ -368,7 +368,7 @@ class DescribeGroupsRequest_v2(Request):
368368
class DescribeGroupsRequest_v3(Request):
369369
API_KEY = 15
370370
API_VERSION = 3
371-
RESPONSE_TYPE = DescribeGroupsResponse_v2
371+
RESPONSE_TYPE = DescribeGroupsResponse_v3
372372
SCHEMA = Schema(
373373
('groups', Array(String('utf-8'))),
374374
('include_authorized_operations', Boolean)

kafka/protocol/metadata.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Boolean, Int16, Int32, Schema, String
4+
from kafka.protocol.types import Array, Boolean, Int16, Int32, Schema, String, BitField
55

66

77
class MetadataResponse_v0(Response):
@@ -164,6 +164,36 @@ class MetadataResponse_v7(Response):
164164
)
165165

166166

167+
class MetadataResponse_v8(Response):
168+
"""v8 adds authorized_operations fields"""
169+
API_KEY = 3
170+
API_VERSION = 8
171+
SCHEMA = Schema(
172+
('throttle_time_ms', Int32),
173+
('brokers', Array(
174+
('node_id', Int32),
175+
('host', String('utf-8')),
176+
('port', Int32),
177+
('rack', String('utf-8')))),
178+
('cluster_id', String('utf-8')),
179+
('controller_id', Int32),
180+
('topics', Array(
181+
('error_code', Int16),
182+
('topic', String('utf-8')),
183+
('is_internal', Boolean),
184+
('partitions', Array(
185+
('error_code', Int16),
186+
('partition', Int32),
187+
('leader', Int32),
188+
('leader_epoch', Int32),
189+
('replicas', Array(Int32)),
190+
('isr', Array(Int32)),
191+
('offline_replicas', Array(Int32)))),
192+
('authorized_operations', BitField))),
193+
('authorized_operations', BitField)
194+
)
195+
196+
167197
class MetadataRequest_v0(Request):
168198
API_KEY = 3
169199
API_VERSION = 0
@@ -245,13 +275,27 @@ class MetadataRequest_v7(Request):
245275
NO_TOPICS = []
246276

247277

278+
class MetadataRequest_v8(Request):
279+
API_KEY = 3
280+
API_VERSION = 8
281+
RESPONSE_TYPE = MetadataResponse_v8
282+
SCHEMA = Schema(
283+
('topics', Array(String('utf-8'))),
284+
('allow_auto_topic_creation', Boolean),
285+
('include_cluster_authorized_operations', Boolean),
286+
('include_topic_authorized_operations', Boolean)
287+
)
288+
ALL_TOPICS = None
289+
NO_TOPICS = []
290+
291+
248292
MetadataRequest = [
249293
MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
250294
MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5,
251-
MetadataRequest_v6, MetadataRequest_v7,
295+
MetadataRequest_v6, MetadataRequest_v7, MetadataRequest_v8,
252296
]
253297
MetadataResponse = [
254298
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
255299
MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5,
256-
MetadataResponse_v6, MetadataResponse_v7,
300+
MetadataResponse_v6, MetadataResponse_v7, MetadataResponse_v8,
257301
]

kafka/protocol/types.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,3 +363,34 @@ def decode(self, data):
363363
return None
364364
return [self.array_of.decode(data) for _ in range(length)]
365365

366+
367+
class BitField(AbstractType):
368+
@classmethod
369+
def decode(cls, data):
370+
return cls.from_32_bit_field(Int32.decode(data))
371+
372+
@classmethod
373+
def encode(cls, vals):
374+
# to_32_bit_field returns unsigned val, so we need to
375+
# encode >I to avoid crash if/when byte 31 is set
376+
# (note that decode as signed still works fine)
377+
return struct.Struct('>I').pack(cls.to_32_bit_field(vals))
378+
379+
@classmethod
380+
def to_32_bit_field(cls, vals):
381+
value = 0
382+
for b in vals:
383+
assert 0 <= b < 32
384+
value |= 1 << b
385+
return value
386+
387+
@classmethod
388+
def from_32_bit_field(cls, value):
389+
result = set()
390+
count = 0
391+
while value != 0:
392+
if (value & 1) != 0:
393+
result.add(count)
394+
count += 1
395+
value = (value & 0xFFFFFFFF) >> 1
396+
return result

kafka/util.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,4 @@ def wrapper(self, *args, **kwargs):
138138
return func(self, *args, **kwargs)
139139
functools.update_wrapper(wrapper, func)
140140
return wrapper
141+

test/test_protocol.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
import io
33
import struct
44

5+
import pytest
6+
57
from kafka.protocol.api import RequestHeader
68
from kafka.protocol.fetch import FetchRequest, FetchResponse
79
from kafka.protocol.find_coordinator import FindCoordinatorRequest
810
from kafka.protocol.message import Message, MessageSet, PartialMessage
911
from kafka.protocol.metadata import MetadataRequest
10-
from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes
12+
from kafka.protocol.types import Int16, Int32, Int64, String, UnsignedVarInt32, CompactString, CompactArray, CompactBytes, BitField
1113

1214

1315
def test_create_message():
@@ -332,3 +334,11 @@ def test_compact_data_structs():
332334
assert CompactBytes.decode(io.BytesIO(b'\x01')) == b''
333335
enc = CompactBytes.encode(b'foo')
334336
assert CompactBytes.decode(io.BytesIO(enc)) == b'foo'
337+
338+
339+
@pytest.mark.parametrize(('test_set',), [
340+
(set([0, 1, 5, 10, 31]),),
341+
(set(range(32)),),
342+
])
343+
def test_bit_field(test_set):
344+
assert BitField.decode(io.BytesIO(BitField.encode(test_set))) == test_set

test/test_util.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@
2222
def test_topic_name_validation(topic_name, expectation):
2323
with expectation:
2424
ensure_valid_topic_name(topic_name)
25+

0 commit comments

Comments
 (0)