-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: GRPC Java implementation for client layer V2 (#511)
- Loading branch information
Showing
7 changed files
with
547 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<groupId>org.aneo</groupId> | ||
<artifactId>armonik-java</artifactId> | ||
<description>GRPC java binding for the Armonik orchestrator API</description> | ||
<version>0.1.0</version> | ||
|
||
<properties> | ||
<maven.compiler.source>17</maven.compiler.source> | ||
<maven.compiler.target>17</maven.compiler.target> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version> | ||
<os-maven-plugin.version>1.7.1</os-maven-plugin.version> | ||
<proto.path>../../Protos/V1</proto.path> | ||
<sl4j.version>2.0.12</sl4j.version> | ||
<logback.version>1.5.5</logback.version> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>io.grpc</groupId> | ||
<artifactId>grpc-netty</artifactId> | ||
<version>1.62.2</version> | ||
</dependency> | ||
|
||
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core --> | ||
<dependency> | ||
<groupId>io.projectreactor</groupId> | ||
<artifactId>reactor-core</artifactId> | ||
<version>3.6.5</version> | ||
</dependency> | ||
|
||
|
||
<dependency> | ||
<groupId>io.grpc</groupId> | ||
<artifactId>grpc-protobuf</artifactId> | ||
<version>1.62.2</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.grpc</groupId> | ||
<artifactId>grpc-stub</artifactId> | ||
<version>1.62.2</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.xolstice.maven.plugins</groupId> | ||
<artifactId>protobuf-maven-plugin</artifactId> | ||
<version>0.6.1</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>javax.annotation</groupId> | ||
<artifactId>javax.annotation-api</artifactId> | ||
<version>1.3.2</version> | ||
</dependency> | ||
|
||
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic --> | ||
<dependency> | ||
<groupId>ch.qos.logback</groupId> | ||
<artifactId>logback-classic</artifactId> | ||
<version>${logback.version}</version> | ||
</dependency> | ||
|
||
|
||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
<version>${sl4j.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-engine</artifactId> | ||
<version>5.10.2</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
|
||
|
||
<!-- Mockito dependencies --> | ||
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-core</artifactId> | ||
<version>5.11.0</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
|
||
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-junit-jupiter</artifactId> | ||
<version>5.11.0</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
|
||
</dependencies> | ||
|
||
|
||
<build> | ||
<extensions> | ||
<extension> | ||
<groupId>kr.motd.maven</groupId> | ||
<artifactId>os-maven-plugin</artifactId> | ||
<version>${os-maven-plugin.version}</version> | ||
</extension> | ||
</extensions> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.xolstice.maven.plugins</groupId> | ||
<artifactId>protobuf-maven-plugin</artifactId> | ||
<version>${protobuf-maven-plugin.version}</version> | ||
<configuration> | ||
<protoSourceRoot>${proto.path}</protoSourceRoot> | ||
<protocArtifact> | ||
com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier} | ||
</protocArtifact> | ||
<pluginId>grpc-java</pluginId> | ||
<pluginArtifact> | ||
io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier} | ||
</pluginArtifact> | ||
</configuration> | ||
<executions> | ||
<execution> | ||
<goals> | ||
<goal>compile</goal> | ||
<goal>compile-custom</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
</project> |
185 changes: 185 additions & 0 deletions
185
packages/java/src/main/java/armonik/client/event/EventClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
package armonik.client.event; | ||
|
||
import static armonik.api.grpc.v1.events.EventsCommon.EventsEnum.EVENTS_ENUM_NEW_RESULT; | ||
import static armonik.api.grpc.v1.events.EventsCommon.EventsEnum.EVENTS_ENUM_RESULT_STATUS_UPDATE; | ||
import static armonik.api.grpc.v1.results.ResultsFields.ResultRawEnumField.RESULT_RAW_ENUM_FIELD_RESULT_ID; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import armonik.api.grpc.v1.FiltersCommon; | ||
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionRequest; | ||
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse; | ||
import armonik.api.grpc.v1.events.EventsGrpc; | ||
import armonik.api.grpc.v1.events.EventsGrpc.EventsBlockingStub; | ||
import armonik.api.grpc.v1.results.ResultsFields; | ||
import armonik.api.grpc.v1.results.ResultsFilters; | ||
import armonik.client.event.util.records.EventSubscriptionResponseRecord; | ||
import io.grpc.ManagedChannel; | ||
import io.grpc.stub.StreamObserver; | ||
|
||
/** | ||
* EventClient is a client for interacting with event-related functionalities. | ||
* It communicates with a gRPC server using a blocking stub to retrieve events. | ||
*/ | ||
public class EventClient { | ||
/** The blocking and nonblocking stub for communicating with the gRPC server. */ | ||
private final EventsBlockingStub eventsBlockingStub; | ||
private final EventsGrpc.EventsStub eventsStub; | ||
|
||
/** | ||
* Constructs a new EventClient with the specified managed channel. | ||
* | ||
* @param managedChannel the managed channel used for communication with the | ||
* server | ||
*/ | ||
public EventClient(ManagedChannel managedChannel) { | ||
eventsBlockingStub = EventsGrpc.newBlockingStub(managedChannel); | ||
eventsStub = EventsGrpc.newStub(managedChannel); | ||
} | ||
|
||
/** | ||
* Retrieves a list of event subscription response records for the given session | ||
* ID and result IDs. | ||
* | ||
* @param sessionId the session ID for which events are requested | ||
* @param resultIds the list of result IDs for which events are requested | ||
* @return a list of EventSubscriptionResponseRecord objects representing the | ||
* events | ||
*/ | ||
public List<EventSubscriptionResponseRecord> getEvents(String sessionId, List<String> resultIds) { | ||
EventSubscriptionRequest request = CreateEventSubscriptionRequest(sessionId, resultIds); | ||
return mapToRecord(sessionId, request, resultIds); | ||
} | ||
|
||
/** | ||
* Maps the received event subscription response to | ||
* EventSubscriptionResponseRecord objects. | ||
* | ||
* @param sessionId the session ID for which events are being mapped | ||
* @param request the event subscription request | ||
* @return a list of EventSubscriptionResponseRecord objects representing the | ||
* events | ||
*/ | ||
private List<EventSubscriptionResponseRecord> mapToRecord(String sessionId, EventSubscriptionRequest request, | ||
List<String> resultIds) { | ||
List<EventSubscriptionResponseRecord> responseRecords = new ArrayList<>(); | ||
Iterator<EventSubscriptionResponse> events = eventsBlockingStub.getEvents(request); | ||
Set<String> resultsExpected = new HashSet<>(resultIds); | ||
|
||
while (events.hasNext()) { | ||
var esr = events.next(); | ||
resultsExpected.remove(esr.getNewResult().getResultId()); | ||
responseRecords | ||
.add(new EventSubscriptionResponseRecord(sessionId, | ||
esr.getTaskStatusUpdate(), | ||
esr.getResultStatusUpdate(), | ||
esr.getResultOwnerUpdate(), | ||
esr.getNewTask(), | ||
esr.getNewResult())); | ||
if (resultsExpected.isEmpty()) { | ||
try { | ||
Thread.sleep(10000); | ||
} catch (InterruptedException e) { | ||
System.out.println("Thread was interrupted while sleeping"); | ||
} | ||
break; | ||
} | ||
} | ||
return responseRecords; | ||
} | ||
|
||
/** | ||
* Retrieves a list of event subscription response records for the given session | ||
* asynchrone | ||
* ID and result IDs. | ||
* | ||
* @param sessionId the session ID for which events are requested | ||
* @param resultIds the list of result IDs for which events are requested | ||
* @return a list of EventSubscriptionResponseRecord objects representing the | ||
* events | ||
* @throws InterruptedException | ||
*/ | ||
public List<EventSubscriptionResponseRecord> getEventResponseRecords(String sessionId, List<String> resultIds) | ||
throws InterruptedException { | ||
|
||
EventSubscriptionRequest request = CreateEventSubscriptionRequest(sessionId, resultIds); | ||
List<EventSubscriptionResponseRecord> responseRecords = new ArrayList<>(); | ||
CountDownLatch finishLatch = new CountDownLatch(1); | ||
|
||
StreamObserver<EventSubscriptionResponse> responseObserver = new StreamObserver<EventSubscriptionResponse>() { | ||
|
||
@Override | ||
public void onNext(EventSubscriptionResponse esr) { | ||
responseRecords.add(new EventSubscriptionResponseRecord( | ||
sessionId, | ||
esr.getTaskStatusUpdate(), | ||
esr.getResultStatusUpdate(), | ||
esr.getResultOwnerUpdate(), | ||
esr.getNewTask(), | ||
esr.getNewResult())); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
t.printStackTrace(); | ||
finishLatch.countDown(); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
System.out.println("Stream completed"); | ||
finishLatch.countDown(); | ||
} | ||
}; | ||
|
||
eventsStub.getEvents(request, responseObserver); | ||
|
||
// Wait for the response observer to finish | ||
if (!finishLatch.await(1, TimeUnit.MINUTES)) { | ||
System.out.println("Request not completed within the timeout."); | ||
} | ||
|
||
return responseRecords; | ||
} | ||
|
||
/** | ||
* Creates an event subscription request with the specified session ID and | ||
* result IDs. | ||
* | ||
* @param sessionId the session ID for which event subscription is requested | ||
* @param resultIds the list of result IDs to filter events | ||
* @return an EventSubscriptionRequest object configured with the provided | ||
* session ID and result IDs | ||
*/ | ||
public static EventSubscriptionRequest CreateEventSubscriptionRequest(String sessionId, List<String> resultIds) { | ||
FiltersCommon.FilterString filterString = FiltersCommon.FilterString.newBuilder() | ||
.setOperator(FiltersCommon.FilterStringOperator.FILTER_STRING_OPERATOR_EQUAL) | ||
.build(); | ||
|
||
ResultsFields.ResultField.Builder resultField = ResultsFields.ResultField.newBuilder() | ||
.setResultRawField(ResultsFields.ResultRawField.newBuilder().setField(RESULT_RAW_ENUM_FIELD_RESULT_ID)); | ||
|
||
ResultsFilters.FilterField.Builder filterFieldBuilder = ResultsFilters.FilterField.newBuilder() | ||
.setField(resultField) | ||
.setFilterString(filterString); | ||
|
||
ResultsFilters.Filters.Builder resultFiltersBuilder = ResultsFilters.Filters.newBuilder(); | ||
for (String resultId : resultIds) { | ||
filterFieldBuilder.setFilterString(FiltersCommon.FilterString.newBuilder().setValue(resultId).build()); | ||
resultFiltersBuilder.addOr(ResultsFilters.FiltersAnd.newBuilder().addAnd(filterFieldBuilder).build()); | ||
} | ||
|
||
return EventSubscriptionRequest.newBuilder() | ||
.setResultsFilters(resultFiltersBuilder.build()) | ||
.addReturnedEvents(EVENTS_ENUM_RESULT_STATUS_UPDATE) | ||
.addReturnedEvents(EVENTS_ENUM_NEW_RESULT) | ||
.setSessionId(sessionId) | ||
.build(); | ||
} | ||
} |
22 changes: 22 additions & 0 deletions
22
...java/src/main/java/armonik/client/event/util/records/EventSubscriptionResponseRecord.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package armonik.client.event.util.records; | ||
|
||
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.NewResult; | ||
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.NewTask; | ||
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.ResultOwnerUpdate; | ||
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.ResultStatusUpdate; | ||
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.TaskStatusUpdate; | ||
|
||
/** | ||
* EventSubscriptionResponseRecord represents a record containing subscription | ||
* response details for an event. | ||
* It encapsulates various attributes related to event subscription, such as | ||
* session ID, task status update, | ||
* result status update, result owner update, new task, and new result. | ||
*/ | ||
public record EventSubscriptionResponseRecord(String sessionId, | ||
TaskStatusUpdate taskStatusUpdate, | ||
ResultStatusUpdate resultStatusUpdate, | ||
ResultOwnerUpdate resultOwnerUpdate, | ||
NewTask newTask, | ||
NewResult newResult) { | ||
} |
Oops, something went wrong.