-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Improve c8y_mapper_ext operation handler API #2997
refactor: Improve c8y_mapper_ext operation handler API #2997
Conversation
1603106
to
c0dac2d
Compare
Robot Results
|
@@ -82,6 +81,7 @@ pub fn succeed_operation( | |||
|
|||
#[derive(Debug, Copy, Clone)] | |||
pub enum CumulocitySupportedOperations { | |||
C8ySoftwareList, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c8y_SoftwareList
is a very old operation replaced by c8y_SoftwareUpdate
long time ago. If we need this, something is wrong (likely confusion of the inventory fragment name c8y_SoftwareList
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I didn't know that, it seemed weird that it was missing, now i know the reason :) I'll replace it with a proper usage of c8y_SoftwareUpdate
c0dac2d
to
08aab06
Compare
08aab06
to
5c0f3db
Compare
|
||
#[tokio::test] | ||
async fn mapper_converts_config_metadata_to_supported_op_and_types_for_main_device() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test has been moved to tests.rs
, while other tests has been moved out of tests.rs
and dispatched over misc xxx.rs
.
We should really avoid to do that on a refactoring PR. Reorganize either the code or the tests, not both simultaneously. This makes things harder to understand & check for little benefits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved. This is a really nice simplification of the operation handlers.
Ok(OperationResult::Finished { | ||
messages: vec![ | ||
MqttMessage::new(topic, smartrest_set_operation), | ||
self.request_software_list(&target.topic_id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to this refactoring PR, there is now an opportunity to remove from here the software list request, as command finalization in now done in one place
Unfortunately, this is not enough to remove the need for an id generator clone (currently both the converter and the running operation for the software update need to generate id). An option could be to delegate the OperationHandler
the generation of operation ids.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to this refactoring PR, there is now an opportunity to remove from here the software list request, as command finalization in now done in one place
I'm not sure if removing software_list
publish from software_update
is practical:
- Local
software_update
operation is published on local MQTT broker inc8y_mapper_ext/src/converter.rs:770
, in functionforward_software_request
- This local
software_update
operation is picked up by theOperationHandler
which runsc8y_mapper_ext::operations::software_update::OperationContext::publish_software_update_status
publish_software_update_status
returns "succeed operation" message with no payload, and invokes a localsoftware_list
operation aftersoftware_update
operation is succeeded or failed.publish_software_list
, when it's successful, sends the software list via HTTP proxy or smartrest.
What I'm seeing is that publish_software_list
isn't an operation, and it can be invoked anytime, just after software_update
software list changes, so it means it has to be updated. software_list
isn't necessary to finalize the software_update
operation (we send together success/failure of software_update
and invocation of software_list
), and software_update
is the only operation that does it, so it makes little sense to extract it.
The only thing that's problematic is sharing this id generator, because generating the ids is necessary for creating local operations. Right now, creating local operations is also spread among methods in CumulocityConverter
: forward_software_request
, forward_restart_request
, forward_operation_request
, but also convert_log_upload_request
, convert_config_snapshot_request
etc. in operations
module. They all do the same thing (converting c8y operation messages to local thin-edge operation messages), so they should be put together in a single place, either in OperationHandler
, or in some other object that will have smaller scope. It would then contain the IdGenerator
and provide methods to start new local operations by publishing on the local operation topics. software_update
would then simply call this to create a new software_list
operation instead of doing it manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'm seeing is that
publish_software_list
isn't an operation, and it can be invoked anytime, just aftersoftware_update
software list changes, so it means it has to be updated.software_list
isn't necessary to finalize thesoftware_update
operation (we send together success/failure ofsoftware_update
and invocation ofsoftware_list
), andsoftware_update
is the only operation that does it, so it makes little sense to extract it.
Yes, sending a software list after a software update is not strictly required by c8y and is always done at the device initiative. This is done in two cases: when the software update capability is added and after each software update request.
Technically, I see no reason to have this done inside the software update handler or elsewhere.
My concern is that one should have only one component that trigger operations on behalf of c8y.
The only thing that's problematic is sharing this id generator, because generating the ids is necessary for creating local operations.
We are fully aligned here.
Right now, creating local operations is also spread among methods in
CumulocityConverter
:forward_software_request
,forward_restart_request
,forward_operation_request
, but alsoconvert_log_upload_request
,convert_config_snapshot_request
etc. inoperations
module. They all do the same thing (converting c8y operation messages to local thin-edge operation messages), so they should be put together in a single place, either inOperationHandler
, or in some other object that will have smaller scope. It would then contain theIdGenerator
and provide methods to start new local operations by publishing on the local operation topics.software_update
would then simply call this to create a newsoftware_list
operation instead of doing it manually.
As you say: they should be put together in a single place, either in OperationHandler
or in some other object.
I'm looking for something symmetric:
- the object creating an operation request should also be the object that clears that operation request when done
- the
OperationHandler
seems to be a nice candidate for that - when clearing a software update this object could also possibly trigger a following software list request
- in case you wonder: I'm not requesting you to do that in this PR.
5c0f3db
to
105af44
Compare
Observed for the second time since yesterday on merge queue:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I'm still not able to make up my mind on what the right way to handle channel errors emitted from these spawned tasks are. But since the current mechanism also works, approving.
mqtt_publisher | ||
.send(c8y_state_executing_message) | ||
.await | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you're doing these unwraps here because even in the case of a failure, it'll only terminate the spawned tasks and not affect the mapper itself. And these terminated tasks will be removed from running_operations
when the next message for that operation type is handled.
I'm just wondering if we should have propagated these failures to the mapper, so that the runtime can shutdown the mapper with the hope of a recovery from that broken state. It may not be such a big deal, because if the sends through this publisher are failing in these tasks, then sends directly from the mapper are also likely to fail which will eventually shutdown the mapper anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And these terminated tasks will be removed from
running_operations
when the next message for that operation type is handled.
Not next message for that operation type, but next message for that particular operation, with that particular id.
But I think there's a problem: in practice, when a panic in a child task happens, .handle()
will never get called for that operation again. Operation task itself sends the messages, so if they can't be sent over MQTT because of a panic, they won't be handled, won't be joined, so we will not see that an exception has occurred.
For now, in this PR I will add a test to verify the behaviour of the handle
function - when does it panic and when does it actually join the tasks.
But what I realize is that OperationHandler
needs its own background task that joins the operation tasks (i.e. polls next future in FuturesUnordered
), which will have to be added, or I'll have to make OperationHandler
an actor, which I probably should. So far I've avoided it because I wanted OperationHandler
to be a small object used and managed by the converter, not another actor, and also because our actors tend to have a lot of boilerplate, but the time for that is probably now and it's what I'll have to do in a follow-up PR.
So AFAIK right now we don't actually join the panicked tasks and don't know that a panic has occurred. This is the first thing that I should fix. Then, I'll need to make sure tasks are joined automatically so .handle()
doesn't have to be called to know whether or not there's been a panic. I leave to you the decision of whether these changes should be made in a follow-up PR, or in this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improved in 087e968, should clarify things for now, and could be improved in a follow-up PR (#2880 (comment)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll have to make OperationHandler an actor, which I probably should
Yeah, that's pretty much aligned with the model that I had in mind too, esp after the introduction of the availability actor.
but the time for that is probably now and it's what I'll have to do in a follow-up PR.
That's fine. One step at a time.
Tests that verify the execution of the operations (i.e. where we generate an operation with a given id and verify its outputs) were moved into their own modules. As handling the operation usually consists of 2 things: 1. convert cumulocity smartrest message to local MQTT operation message and back 2. handle local MQTT operation message These steps are performed by different components, so the unit tests are also separated. Signed-off-by: Marcel Guzik <[email protected]>
This `From` impl was in c8y_api, which shouldn't know anything about tedge_api and also had a bug (C8ySoftwareUpdate => ConfigSnapshot). There was only one callsite, where we should've used `OperationType` directly anyway. Signed-off-by: Marcel Guzik <[email protected]>
Signed-off-by: Marcel Guzik <[email protected]>
More precisely, `to_response` function takes the operation result and converts it to a cumulocity response. The handlers of different operations now don't have to concern themselves with creating a cumulocity response, they can just return the error directly. Signed-off-by: Marcel Guzik <[email protected]>
1. Changed error type from `ConversionError` to `OperationError` which displays its source error in a single line 2. Added context to some operation error cases Signed-off-by: Marcel Guzik <[email protected]>
…d uploads are ongoing Signed-off-by: Marcel Guzik <[email protected]>
105af44
to
d9f1663
Compare
#[tokio::test] | ||
async fn mapper_processes_other_operations_while_uploads_and_downloads_are_ongoing() { | ||
let cfg_dir = TempTedgeDir::new(); | ||
let TestHandle { | ||
mqtt, | ||
http, | ||
dl, | ||
ul, | ||
mut timer, | ||
.. | ||
} = spawn_c8y_mapper_actor(&cfg_dir, true).await; | ||
let mut dl = dl.with_timeout(TEST_TIMEOUT_MS); | ||
let mut ul = ul.with_timeout(TEST_TIMEOUT_MS); | ||
|
||
spawn_dummy_c8y_http_proxy(http); | ||
|
||
// Complete sync phase so that alarm mapping starts | ||
trigger_timeout(&mut timer).await; | ||
|
||
let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); | ||
skip_init_messages(&mut mqtt).await; | ||
|
||
// simulate many successful operations that needs to be handled by the mapper | ||
mqtt.send(MqttMessage::new( | ||
&Topic::new_unchecked("te/device/main///cmd/log_upload/c8y-mapper-1"), | ||
json!({ | ||
"status": "successful", | ||
"tedgeUrl": "http://localhost:8888/tedge/file-transfer/test-device/log_upload/c8y-mapper-1", | ||
"type": "mosquitto", | ||
"dateFrom": "2023-11-28T16:33:50+01:00", | ||
"dateTo": "2023-11-29T16:33:50+01:00", | ||
"searchText": "ERROR", | ||
"lines": 1000 | ||
}) | ||
.to_string(), | ||
)) | ||
.await.unwrap(); | ||
|
||
let (download_id, download_request) = dl | ||
.recv() | ||
.await | ||
.expect("DownloadRequest for log_upload should be sent"); | ||
assert_eq!(download_id, "c8y-mapper-1"); | ||
assert_eq!( | ||
download_request.url, | ||
"http://localhost:8888/tedge/file-transfer/test-device/log_upload/c8y-mapper-1" | ||
); | ||
|
||
// here it would be good to assert that upload message hasn't been sent yet, but due to the | ||
// behaviour of message channels it can't be easily done | ||
|
||
dl.send(( | ||
"c8y-mapper-1".to_string(), | ||
Ok(DownloadResponse { | ||
url: "http://localhost:8888/tedge/file-transfer/test-device/log_upload/c8y-mapper-1" | ||
.to_string(), | ||
file_path: "whatever".into(), | ||
}), | ||
)) | ||
.await | ||
.unwrap(); | ||
|
||
let (upload_id, _) = ul | ||
.recv() | ||
.await | ||
.expect("UploadRequest for log_upload should be sent"); | ||
assert_eq!(upload_id, "c8y-mapper-1"); | ||
|
||
// now that an upload is ongoing, check that downloads can also be triggered | ||
mqtt.send(MqttMessage::new( | ||
&Topic::new_unchecked("te/device/main///cmd/config_snapshot/c8y-mapper-2"), | ||
json!({ | ||
"status": "successful", | ||
"tedgeUrl": "http://localhost:8888/tedge/file-transfer/test-device/config_snapshot/c8y-mapper-2", | ||
"type": "typeA", | ||
}) | ||
.to_string(), | ||
)) | ||
.await.unwrap(); | ||
|
||
let (config_snapshot_id, _) = dl | ||
.recv() | ||
.await | ||
.expect("DownloadRequest for config snapshot should be sent"); | ||
assert_eq!(config_snapshot_id, "c8y-mapper-2"); | ||
|
||
// while download and upload are ongoing, try some other operation that doesn't do download or upload | ||
mqtt.send(MqttMessage::new( | ||
&Topic::new_unchecked("te/device/main///cmd/restart/c8y-mapper-3"), | ||
json!({ | ||
"status": "successful", | ||
}) | ||
.to_string(), | ||
)) | ||
.await | ||
.unwrap(); | ||
|
||
assert_received_contains_str(&mut mqtt, [("c8y/s/us", "503,c8y_Restart")]).await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@albinsuresh My bad, by mistake I included the test you were proposing (#2904 (comment)) in this PR, but since it's already here, you can take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just some minor cosmetic comments.
087e968
to
900b6d9
Compare
Signed-off-by: Marcel Guzik <[email protected]>
900b6d9
to
d74a8c8
Compare
TODO
Stop parsing the operation message payload twice. First parse it as a generic operation, and then only parse the rest of the payloadA bit complicated to do right now, would requireCommand<Payload>
andGenericCommandState
to be refactored, better to do it in another PR.handle
function (refactor: Improve c8y_mapper_ext operation handler API #2997 (comment))Proposed changes
ChannelError
when trying to send a message through a channel) would notEXECUTING
andFAILED
messages out to main handler.Types of changes
Paste Link to the issue
Checklist
cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments
This PR's focus is mainly removing unnecessary assumptions about MQTT API from operation functions and centralizing them in the main handler. This makes the code simpler to understand as it provides a more rigid structure and a simpler interface, and serves as a stepping stone to eventual extraction of clearing messages to the same place where we're initiating local operations. Once operation initialization and clearing messages will be in the same place, we can move the state that keeps track of which operations are running there and stop relying on MQTT broker echo of command id messages back to the mapper for it to handle it.
Also, the PR contains one new test that checks some paths in the
OperationHandler::handle
method. This test does not utilize sending raw MQTT messages through the actor message boxes. I plan to add more tests like this for individual operations, that more thoroughly test the possible branches.Finally, the PR tackles clean up operation API section of #2880 (comment), but doesn't fully close the task. I'd like to handle the
SUCCESSFUL
messages in another PR, to keep this one relatively small.