Skip to content

Commit

Permalink
Merge pull request #105 from Telecominfraproject/dev-basic-msg-relay-…
Browse files Browse the repository at this point in the history
…test

Dev basic msg relay test
  • Loading branch information
Cahb authored Dec 10, 2024
2 parents 3326b12 + 7d1588c commit cd35ca6
Show file tree
Hide file tree
Showing 7 changed files with 669 additions and 3 deletions.
5 changes: 4 additions & 1 deletion src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,10 @@ impl CGWConnectionServer {
debug!("Received {num_of_msg_read} messages from NB API, processing...");

let partition_mapping = self.nb_api_client.get_partition_to_local_shard_mapping();
debug!("Kafka partitions idx:key mapping info: {:?}", partition_mapping);
debug!(
"Kafka partitions idx:key mapping info: {:?}",
partition_mapping
);
if !partition_mapping.is_empty() {
partition_array_idx += 1;
if partition_array_idx >= partition_mapping.len() {
Expand Down
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from client_simulator.src.simulation_runner import Device as DeviceSimulator
from kafka_producer.src.producer import Producer as KafkaProducer
from kafka_producer.src.consumer import Consumer as KafkaConsumer
from kafka_producer.src.admin import Admin as KafkaAdmin
from psql_client.psql_client import PostgreSQLClient as PSQLClient
from redis_client.redis_client import RedisClient as RedisClient
import requests
Expand All @@ -26,6 +27,10 @@ def default_kafka_group() -> str:
def default_shard_id() -> int:
return 0

@staticmethod
def default_producer_topic() -> str:
return 'CnC'

def __init__(self):
device = DeviceSimulator(
mac=self.default_dev_sim_mac(),
Expand Down Expand Up @@ -54,9 +59,11 @@ def __init__(self):

producer = KafkaProducer(db='localhost:9092', topic='CnC')
consumer = KafkaConsumer(db='localhost:9092', topic='CnC_Res', consumer_timeout=12000)
admin = KafkaAdmin(host='localhost', port=9092)

self.kafka_producer = producer
self.kafka_consumer = consumer
self.kafka_admin = admin

psql_client = PSQLClient(host="localhost", port=5432, database="cgw", user="cgw", password="123")
self.psql_client = psql_client
Expand Down Expand Up @@ -91,6 +98,7 @@ def test_context():

ctx.kafka_producer.disconnect()
ctx.kafka_consumer.disconnect()
ctx.kafka_admin.disconnect()

ctx.psql_client.disconnect()
ctx.redis_client.disconnect()
Expand Down Expand Up @@ -119,6 +127,13 @@ def kafka_probe(test_context):
# We have to clear any messages before we can work with kafka
test_context.kafka_consumer.flush()

@pytest.fixture(scope='function')
def kafka_admin_probe(test_context):
try:
test_context.kafka_admin.connect()
except:
raise Exception('Failed to connect to Kafka broker!')

@pytest.fixture(scope='function')
def device_sim_connect(test_context):
# Make sure we initiate connect;
Expand Down
2 changes: 2 additions & 0 deletions tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ ln -sf ../utils/client_simulator/sim_data sim_data
ln -sf ../utils/kafka_producer/kafka_data kafka_data
ln -sf ../utils/cert_generator/certs/client/ certs
ln -sf ../utils/cert_generator/certs/ca/ ca-certs
ln -sf ../utils/client_simulator/ client_simulator
ln -sf ../utils/kafka_producer/ kafka_producer
ln -sf ../utils/psql_client/ psql_client
ln -sf ../utils/redis_client/ redis_client

Expand Down
Loading

0 comments on commit cd35ca6

Please sign in to comment.