Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pluggable Streamer Prototype in Java #85

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/main/java/org/eclipse/uprotocol/streamer/UStreamer.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ public UStatus addForwardingRule(Route in, Route out) {
}

TransportListener listener = new TransportListener(in, out);
UUri uri = UUri.newBuilder().setAuthority(out.getAuthority()).build();
UUri out_uri = UUri.newBuilder().setAuthority(out.getAuthority()).build();

UStatus result = in.getTransport().registerListener(uri, listener);
UStatus result = in.getTransport().registerListener(out_uri, listener);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am recalling @sophokles73's feedback from about a week ago as I was implementing up-streamer-rust.

It seems to me this would nab you the Request, Response, and Notification messages headed to out.getAuthority(), but not Publish messages originating from in.getAuthority().

I think we may need to modify:

UStatus result = in.getTransport().registerListener(out_uri, request_response_notification_listener);

and add:

UUri in_uri = UUri.newBuilder().setAuthority(in.getAuthority()).build();
UStatus result = in.getTransport().registerListener(in_uri, publish_listener);

and make some corresponding changes to TransportListener to allow us to configure it for dropping Publish messages or all non-Publish messages.

Still tooling around with this over on up-streamer-rust, but wanted to make this comment before I forget 🙂


if (result.getCode() != UCode.OK) {
return result;
Expand Down Expand Up @@ -112,6 +112,13 @@ public UStatus deleteForwardingRule(Route in, Route out) {
return UStatus.newBuilder().setCode(UCode.INVALID_ARGUMENT).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who cares? if you can't delete, if there is nothing to delete, it is fine.
delete means I dont want something to be there anymore. If it is not possible then just say ok, deleted.
IMHO, you only give an error when something is there and you could not delete it, meaning I wanted it gone and it is still there. All other cases are, ok - it is not there anymore.
This is from a developer perspective.

}


// Unregister the listener with the transport
listeners.stream()
.filter(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out))
.forEach(p -> in.getTransport().unregisterListener(UUri.newBuilder().setAuthority(out.getAuthority()).build(), p));
stevenhartley marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if unregister fails?
can unregister block the thread?
just a feeling in my tummy that this can end up with race conditions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking behaviour of the unregister method depends on its implementation
If one thread is unregistering listener while another is modifying the list, it can lead to race conditions.
We can use synchronization and an ExecutorService for async operations for added safety.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this is only pseudo code and not intended to be used to build an actual streamer.


// Remove the listener from the list
if (listeners.removeIf(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out))) {
return UStatus.newBuilder().setCode(UCode.OK).build();
}
Expand Down
Loading