Skip to content

Commit

Permalink
update kafka blackbox tests
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Dec 21, 2023
1 parent a511add commit 39adb53
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 3 deletions.
22 changes: 22 additions & 0 deletions hstream-kafka/tests/blackbox/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import socket
import time
from kafka.conn import BrokerConnection
from kafka.record import MemoryRecords, MemoryRecordsBuilder


# Also: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html#kafka.KafkaClient.send
Expand All @@ -24,3 +25,24 @@ def send_req(port, req, resp_count=1):
return resps[0]
else:
return resps


# magic: [0, 1, 2]
# compression_type: [0, 1, 2, 3]
def encode_records(magic, key, value, n):
builder = MemoryRecordsBuilder(
magic=magic, compression_type=0, batch_size=1024 * 10
)
for offset in range(n):
builder.append(timestamp=10000 + offset, key=key, value=value)
builder.close()
return builder.buffer()


def decode_records(records_bs):
record_batch = MemoryRecords(records_bs)
rets = []
while record_batch.has_next():
batch = record_batch.next_batch()
rets.append(list(batch))
return rets
162 changes: 159 additions & 3 deletions hstream-kafka/tests/blackbox/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
import pytest
import timeit
import time
import random
import string

import kafka
from kafka import KafkaConsumer
from kafka.admin import NewTopic
from kafka.structs import TopicPartition
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.produce import ProduceRequest, ProduceResponse

from common import send_req
from common import send_req, encode_records, decode_records

topic_name = "blackbox_test_topic"


def force_create_topic(admin, topic_name):
def force_create_topic(admin, topic_name, partitions=1):
try:
admin.delete_topics([topic_name])
except kafka.errors.UnknownTopicOrPartitionError:
pass
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
topic = NewTopic(
name=topic_name, num_partitions=partitions, replication_factor=1
)
admin.create_topics([topic])


Expand Down Expand Up @@ -95,3 +101,153 @@ def test_offsets_of_empty_topic(new_topic, kafka_port, hstream_kafka_port):
assert kafka_consumer.end_offsets(
[partition]
) == hstream_kafka_consumer.end_offsets([partition])


# parts: [(partition, payload_length)]
def gen_produce(magic, topic_name, parts):
topic_data = [
(p, encode_records(magic, b"", b"h" * length, 1)) for p, length in parts
]
if magic <= 1:
return ProduceRequest[2](
required_acks=-1,
timeout=1000,
topics=[(topic_name, topic_data)],
)
else:
return ProduceRequest[3](
transactional_id=None,
required_acks=-1,
timeout=10000,
topics=[(topic_name, topic_data)],
)


def send_produce(port, req, exp_topic_name):
produce_resp = send_req(port, req)
for t in produce_resp.topics:
assert t[0] == exp_topic_name
for _, errcode, *xs in t[1]:
assert errcode == 0


# parts: [(partition, offset, partition_max_bytes)]
def gen_fetch(max_bytes, topic_name, parts):
return FetchRequest[4](
-1, # replica_id
500, # fetch_max_wait_ms
10, # fetch_min_bytes
max_bytes,
0, # isolation_level
[(topic_name, parts)],
)


def send_fetch(port, req, exp_topic_name):
fetch_resp = send_req(port, req)
for topic, partitions in fetch_resp.topics:
assert topic == exp_topic_name
for (
partition_id,
error_code,
highwater_offset,
last_stable_offset, # v4
aborted_transactions, # v4
records_bs,
) in partitions:
assert error_code == 0
yield records_bs, decode_records(records_bs)


@pytest.fixture
def setup_TestFetchMaxBytes(kafka_admin_client, hstream_kafka_admin_client):
topic_name = "".join(
random.choices(string.ascii_uppercase + string.digits, k=10)
)
force_create_topic(kafka_admin_client, topic_name, partitions=2)
force_create_topic(hstream_kafka_admin_client, topic_name, partitions=2)
# Wait topic creation done
time.sleep(1)
yield topic_name
kafka_admin_client.delete_topics([topic_name])
hstream_kafka_admin_client.delete_topics([topic_name])


@pytest.mark.usefixtures("setup_TestFetchMaxBytes")
@pytest.mark.parametrize("magic", [2])
@pytest.mark.parametrize("port_var", ["kafka_port", "hstream_kafka_port"])
class TestFetchMaxBytes:
def test_max_bytes(self, port_var, magic, setup_TestFetchMaxBytes, request):
port = request.getfixturevalue(port_var)
topic = setup_TestFetchMaxBytes

# Real records(length) in partition0:
#
# record1 record2
# 118 88
send_produce(
port,
gen_produce(magic, topic, [(0, 50), (1, 50)]),
topic,
)
send_produce(
port,
gen_produce(magic, topic, [(0, 20), (1, 20)]),
topic,
)

# Case1
[(records_bs, [[record]])] = list(
send_fetch(
port,
gen_fetch(10, topic, [(0, 0, 2000)]),
topic,
)
)
assert len(records_bs) == 118
record.value == b"h" * 50

# Case2
[(records_bs, [[record]])] = list(
send_fetch(
port,
gen_fetch(118 + 10, topic, [(0, 0, 2000)]),
topic,
)
)
assert len(records_bs) == 128
record.value == b"h" * 50

# Case3
[(records_bs, [[record1], [record2]])] = list(
send_fetch(
port,
gen_fetch(118 + 88, topic, [(0, 0, 2000)]),
topic,
)
)
assert len(records_bs) == 118 + 88
record1.value == b"h" * 50
record2.value == b"h" * 20

# Case4
[(records0_bs, records0), (records1_bs, records1)] = list(
send_fetch(
port,
gen_fetch(118 + 117, topic, [(0, 0, 118), (1, 0, 118)]),
topic,
)
)
assert len(records0_bs) == 118
assert len(records1_bs) == 0

# Case5
[(records0_bs, records0), (records1_bs, records1)] = list(
send_fetch(
port,
gen_fetch(118 + 118, topic, [(0, 0, 118), (1, 0, 118)]),
topic,
)
)
assert len(records0_bs) == 118
assert len(records1_bs) == 118

0 comments on commit 39adb53

Please sign in to comment.