Skip to content

Commit

Permalink
Merge pull request #107 from Telecominfraproject/dev-enhance-nb-replys
Browse files Browse the repository at this point in the history
Dev enhance nb replys
  • Loading branch information
Cahb authored Dec 11, 2024
2 parents 1821404 + 840b9a9 commit 3f2b422
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 43 deletions.
11 changes: 7 additions & 4 deletions src/cgw_app_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,21 +526,25 @@ pub struct AppArgs {
/// Topomap featue status (enabled/disabled)
pub feature_topomap_enabled: bool,

/// Utilize TLS connection with NB infrastructure (Redis, PostgreSQL)
pub nb_infra_tls: bool,

/// CGW Websocket args
pub wss_args: CGWWSSArgs,

/// CGW GRPC args
pub grpc_args: CGWGRPCArgs,

/// CGW Kafka args
pub kafka_args: CGWKafkaArgs,

/// CGW DB args
pub db_args: CGWDBArgs,

/// CGW Redis args
pub redis_args: CGWRedisArgs,

/// CGW Metrics args
pub metrics_args: CGWMetricsArgs,

/// CGW Validation schema URI args
pub validation_schema: CGWValidationSchemaArgs,
}

Expand Down Expand Up @@ -632,7 +636,6 @@ impl AppArgs {
log_level,
cgw_id,
feature_topomap_enabled,
nb_infra_tls,
wss_args,
grpc_args,
kafka_args,
Expand Down
4 changes: 1 addition & 3 deletions src/cgw_connection_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,17 @@ pub struct CGWConnectionProcessor {
cgw_server: Arc<CGWConnectionServer>,
pub serial: MacAddress,
pub addr: SocketAddr,
pub idx: i64,
pub group_id: i32,
pub feature_topomap_enabled: bool,
pub device_type: CGWDeviceType,
}

impl CGWConnectionProcessor {
pub fn new(server: Arc<CGWConnectionServer>, conn_idx: i64, addr: SocketAddr) -> Self {
pub fn new(server: Arc<CGWConnectionServer>, addr: SocketAddr) -> Self {
let conn_processor: CGWConnectionProcessor = CGWConnectionProcessor {
cgw_server: server.clone(),
serial: MacAddress::default(),
addr,
idx: conn_idx,
group_id: 0,
feature_topomap_enabled: server.feature_topomap_enabled,
// Default to AP, it's safe, as later-on it will be changed
Expand Down
30 changes: 27 additions & 3 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,19 @@ impl CGWConnectionServer {
Some(val) => val,
None => {
warn!("Failed to parse recv msg with key {key}, discarded!");

if let Ok(resp) = cgw_construct_infra_enqueue_response(
self.local_cgw_id,
Uuid::default(),
false,
Some(format!("Failed to parse NB API message with key {key}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp);
} else {
error!("Failed to construct device_enqueue message!");
}

continue;
}
};
Expand Down Expand Up @@ -1529,6 +1542,18 @@ impl CGWConnectionServer {
}
}
} else {
if let Ok(resp) = cgw_construct_infra_enqueue_response(
self.local_cgw_id,
Uuid::default(),
false,
Some(format!("Failed to parse NB API message with key {key}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp);
} else {
error!("Failed to construct device_enqueue message!");
}

error!("Failed to parse msg from NBAPI (malformed?)!");
continue;
}
Expand Down Expand Up @@ -1876,8 +1901,7 @@ impl CGWConnectionServer {
self: Arc<Self>,
socket: TcpStream,
tls_acceptor: tokio_rustls::TlsAcceptor,
addr: SocketAddr,
conn_idx: i64,
addr: SocketAddr
) {
// Only ACK connection. We will either drop it or accept it once processor starts
// (we'll handle it via "mailbox" notify handle in process_internal_mbox)
Expand All @@ -1900,7 +1924,7 @@ impl CGWConnectionServer {
};

let allow_mismatch = server_clone.allow_mismatch;
let conn_processor = CGWConnectionProcessor::new(server_clone, conn_idx, addr);
let conn_processor = CGWConnectionProcessor::new(server_clone, addr);
if let Err(e) = conn_processor
.start(tls_stream, client_cn, allow_mismatch)
.await
Expand Down
2 changes: 2 additions & 0 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,8 @@ impl CGWRemoteDiscovery {
"Failed to relay message! CGW{} seems to be unreachable at [{}:{}]! Error: {}",
shard_id, cl.shard.server_host, cl.shard.server_port, e
);

return Err(e);
}

return Ok(());
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ async fn server_loop(app_core: Arc<AppCore>) -> Result<()> {

app_core_clone.conn_ack_runtime_handle.spawn(async move {
cgw_server_clone
.ack_connection(socket, tls_acceptor_clone, remote_addr, conn_idx)
.ack_connection(socket, tls_acceptor_clone, remote_addr)
.await;
});

Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,10 @@ def kafka_default_infra_group(test_context):
@pytest.fixture(scope='function')
def kafka_default_infra(test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Cannot create default infra: kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Cannot create default infra: kafka consumer is not connected to Kafka'

uuid_val = random.randint(1, 100)
default_group = test_context.default_kafka_group()
Expand Down
32 changes: 16 additions & 16 deletions tests/test_cgw_infra_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ class TestCgwInfraGroup:
"psql_probe")
def test_single_infra_group_add_del(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -113,10 +113,10 @@ def test_single_infra_group_add_del(self, test_context):
"psql_probe")
def test_multiple_infra_group_add_del(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -223,10 +223,10 @@ def test_multiple_infra_group_add_del(self, test_context):
"psql_probe")
def test_create_existing_infra_group(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -345,10 +345,10 @@ def test_create_existing_infra_group(self, test_context):
"psql_probe")
def test_remove_not_existing_infra_group(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -394,10 +394,10 @@ def test_remove_not_existing_infra_group(self, test_context):
"psql_probe")
def test_single_infra_group_add_del_to_shard(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -493,10 +493,10 @@ def test_single_infra_group_add_del_to_shard(self, test_context):
"psql_probe")
def test_multiple_infra_group_add_del_to_shard(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -603,10 +603,10 @@ def test_multiple_infra_group_add_del_to_shard(self, test_context):
"psql_probe")
def test_single_infra_group_add_to_not_existing_shard(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -663,10 +663,10 @@ def test_single_infra_group_add_to_not_existing_shard(self, test_context):
"psql_probe")
def test_infra_group_capacity_overflow(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down
28 changes: 14 additions & 14 deletions tests/test_cgw_infras.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ class TestCgwInfra:
"psql_probe")
def test_single_infra_add_del(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -201,10 +201,10 @@ def test_single_infra_add_del(self, test_context):
"redis_probe")
def test_single_infra_add_not_existing_group(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -253,10 +253,10 @@ def test_single_infra_add_not_existing_group(self, test_context):
"redis_probe")
def test_single_infra_del_not_existing_group(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -306,10 +306,10 @@ def test_single_infra_del_not_existing_group(self, test_context):
"psql_probe")
def test_single_infra_del_existing_group_not_existing_infra(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -437,10 +437,10 @@ def test_single_infra_del_existing_group_not_existing_infra(self, test_context):
"psql_probe")
def test_multiple_infras_add_del(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -635,10 +635,10 @@ def test_multiple_infras_add_del(self, test_context):
"psql_probe")
def test_infras_capacity_overflow(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down Expand Up @@ -887,10 +887,10 @@ def test_infras_capacity_overflow(self, test_context):
"psql_probe")
def test_partial_infras_add(self, test_context):
assert test_context.kafka_producer.is_connected(),\
f'Cannot create default group: kafka producer is not connected to Kafka'
f'Kafka producer is not connected to Kafka'

assert test_context.kafka_consumer.is_connected(),\
f'Cannot create default group: kafka consumer is not connected to Kafka'
f'Kafka consumer is not connected to Kafka'

default_shard_id = test_context.default_shard_id()

Expand Down
Loading

0 comments on commit 3f2b422

Please sign in to comment.