-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pluggable Streamer Prototype in Java #85
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really slick! 🙂
I like the concept. I'll think on how to apply a similar clean pattern to up-streamer-rust
* | ||
*/ | ||
public class Route { | ||
private UAuthority authority; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can mark these as final
// Unregister the listener with the transport | ||
listeners.stream() | ||
.filter(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out)) | ||
.forEach(p -> in.getTransport().unregisterListener(UUri.newBuilder().setAuthority(out.getAuthority()).build(), p)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if unregister fails?
can unregister block the thread?
just a feeling in my tummy that this can end up with race conditions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking behaviour of the unregister method depends on its implementation
If one thread is unregistering listener while another is modifying the list, it can lead to race conditions.
We can use synchronization and an ExecutorService for async operations for added safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now this is only pseudo code and not intended to be used to build an actual streamer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor recommendation to use synchronized block, but I'm approving the PR as this code is only for demonstration purposes
// Unregister the listener with the transport | ||
listeners.stream() | ||
.filter(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out)) | ||
.forEach(p -> in.getTransport().unregisterListener(UUri.newBuilder().setAuthority(out.getAuthority()).build(), p)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking behaviour of the unregister method depends on its implementation
If one thread is unregistering listener while another is modifying the list, it can lead to race conditions.
We can use synchronization and an ExecutorService for async operations for added safety.
TransportListener listener = new TransportListener(in, out); | ||
UUri out_uri = UUri.newBuilder().setAuthority(out.getAuthority()).build(); | ||
|
||
UStatus result = in.getTransport().registerListener(out_uri, listener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am recalling @sophokles73's feedback from about a week ago as I was implementing up-streamer-rust
.
It seems to me this would nab you the Request, Response, and Notification messages headed to out.getAuthority()
, but not Publish messages originating from in.getAuthority()
.
I think we may need to modify:
UStatus result = in.getTransport().registerListener(out_uri, request_response_notification_listener);
and add:
UUri in_uri = UUri.newBuilder().setAuthority(in.getAuthority()).build();
UStatus result = in.getTransport().registerListener(in_uri, publish_listener);
and make some corresponding changes to TransportListener
to allow us to configure it for dropping Publish messages or all non-Publish messages.
Still tooling around with this over on up-streamer-rust
, but wanted to make this comment before I forget 🙂
Static routing rules work for messages that have a sink but messages that do not have sink we have to look up to uSubscription for only a subset of topics that we (the streamer) need to listen to so that we don't get everything).
…n the pluggable streamer.
* ned to call this API if its database is flushed or corrupted (ex. factory | ||
* reset). | ||
* **__NOTE:__** This is a private API only for uSubscription services, | ||
* uEs can call Unsubscribe() to flush their own subscriptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is not intended to be called by client developers then it should not be here.
Adding documentation to explain interfaces means that they are not intention revealing and there is nothing stopping anyone from calling this API.
IMHO, it should not be here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll reply in the other comment (they are related).
import org.eclipse.uprotocol.v1.UStatus; | ||
import org.eclipse.uprotocol.core.usubscription.v3.*; | ||
|
||
/* The following is the uSubscription API declared as an interface so it could be easily |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this any different from all the other protobuf API's?
not that I am against an interface, I am just curious as to why this is suddenly an interface while the others are protobuf "interface API"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @tamarafischer (and @AnotherDaniel). The reason why I added this here is because of the following reasons/issues:
- We do not have code generators from proto yet
- code generators create implementations not interfaces, this is problematic if I want to build generic uEs that are not dependent on an implementation (i.e. the pluggable streamer or any uE that will use uSubscription in general and not depend on say a specific up-client implementation).
- USubscription has always been special in that it will not always be 1:1 mapping from the generated code, we will always have to tweak it for the specific implementation so it is better to be a language specific interface like UTransport and RpcClient.
Above is a major shift from current PoR but we need the interface urgently to unblock everyone while we work out tooling for all the other uServices.
*/ | ||
class TransportListener implements UListener { | ||
|
||
private Route in; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have a curcular reference here.
UListener is used by UTransport.
TransportListener has 2 Route objects that also contain a UTransport
How does this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to rethink the circular dependency but like I mention this is only demo code and NOT intended to be used to implement any up-streamer library.
* @param request the request containing the topic, subscriber name, and subscription attributes | ||
* @return the response containing the status of the request and any event delivery configuration | ||
*/ | ||
SubscriptionResponse subscribe(SubscriptionRequest request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these methods always executed within the application process? Is there no IO involved?
If there is I/O the I believe that we should be returning a CompletionStage
At least this way we can compose the operations
public class UStreamer { | ||
|
||
// List of listeners (routes) that the streamer listens to | ||
private List<TransportListener> listeners; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the advantage of the TransportListener?
why can't we just have a list of Route objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The routes are not the rules in the forwarding table, the rules are a combination of in and out Route that then build a TransportListener and all we need to keep track of is the TransportListeners.
While you're offering Steven critique feel free to check out the attempt in Rust 😉 |
} | ||
|
||
// check if the rule already exists in the list | ||
if (listeners.stream().anyMatch(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not very efficient. An O(n) check can be a problem. If the list is small no problem. Maybe a data structure like a Set or a Map can help with managing duplicates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to do this optimization in the demo code though... Maybe I need to move this code out of up-java and into a separate project so that it is not accidentally used by developers!!
.build(); | ||
|
||
// Get the list of subscriptions | ||
FetchSubscriptionsResponse response = submgr.fetchSubscriptions(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if the subscription manager is slow - maybe under load?
what happens if the subscription manager fails? A bug or it is restarting or something?
Is this even a thing we need to handle in the car? Cloud applications must handle these cases and be a little more resilient.
Just saying that this is the happy path, subscription manager is a separate component and anything can happen.
The good thing is that you made it an interface and can now create a USubscription implementation that barfs and see how your application handles these use cases.
Objects.requireNonNull(out, "output cannot be null."); | ||
|
||
if (in.equals(out)) { | ||
return UStatus.newBuilder().setCode(UCode.INVALID_ARGUMENT).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
who cares? if you can't delete, if there is nothing to delete, it is fine.
delete means I dont want something to be there anymore. If it is not possible then just say ok, deleted.
IMHO, you only give an error when something is there and you could not delete it, meaning I wanted it gone and it is still there. All other cases are, ok - it is not there anymore.
This is from a developer perspective.
if (listeners.removeIf(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out))) { | ||
return UStatus.newBuilder().setCode(UCode.OK).build(); | ||
} | ||
return UStatus.newBuilder().setCode(UCode.NOT_FOUND).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like delete, if not found and I want to remove, then I am ok with that. no need to tell me it was not found - meaning someone else already deleted it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there is no value in returning NOT_FOUND you're saying from a developer PoV?
The following pull request implements the pluggable streamer written in Java. It is used to demonstrate what the concept is and how one would implement a streamer using only the uTransport interface. The intention of this PR is to provide example code and NOT be used for implementing an actual streamer as that would be written in C++ or rust to be useable everywhere. .