diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4ae127ebe..c8b0a6d55 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -336,3 +336,23 @@ jobs: if-no-files-found: error path: packages/cpp/tools/packaging/*.${{ matrix.type }} name: libarmonik.${{ matrix.type }} + + build-java-packages: + name: Build Java + runs-on: ubuntu-latest + defaults: + run: + working-directory: packages/java + steps: + - name: Checkout + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4 + with: + fetch-depth: 0 + - name: Set up java 17 + uses: actions/setup-java@v4 + with: + distribution: oracle + java-version: 17 + cache: maven + - name: Build the package + run: mvn clean install -DskipTests diff --git a/.gitignore b/.gitignore index 0deaa71df..19ea261d4 100644 --- a/.gitignore +++ b/.gitignore @@ -381,6 +381,7 @@ dist !packages/python !packages/angular !packages/web +!packages/java packages/web/LICENSE !.docs/content/** @@ -390,3 +391,14 @@ packages/web/LICENSE **/.idea gen + +# Maven +target/ + +# Compiled class file +*.class + +# IDE-specific files +*.iml +*.ipr +*.iws diff --git a/packages/java/pom.xml b/packages/java/pom.xml new file mode 100644 index 000000000..45c41e740 --- /dev/null +++ b/packages/java/pom.xml @@ -0,0 +1,137 @@ + + + 4.0.0 + + org.aneo + armonik-java + GRPC java binding for the Armonik orchestrator API + 0.1.0 + + + 17 + 17 + UTF-8 + 0.6.1 + 1.7.1 + ../../Protos/V1 + 2.0.12 + 1.5.5 + + + + + io.grpc + grpc-netty + 1.62.2 + + + + + io.projectreactor + reactor-core + 3.6.5 + + + + + io.grpc + grpc-protobuf + 1.62.2 + + + io.grpc + grpc-stub + 1.62.2 + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + + javax.annotation + javax.annotation-api + 1.3.2 + + + + + ch.qos.logback + logback-classic + ${logback.version} + + + + + org.slf4j + slf4j-api + ${sl4j.version} + + + + org.junit.jupiter + junit-jupiter-engine + 5.10.2 + test + + + + + + + org.mockito + mockito-core + 5.11.0 + test + + + + + org.mockito + mockito-junit-jupiter + 5.11.0 + test + + + + + + + + + + kr.motd.maven + os-maven-plugin + ${os-maven-plugin.version} + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + ${protobuf-maven-plugin.version} + + ${proto.path} + + com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier} + + grpc-java + + io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier} + + + + + + compile + compile-custom + + + + + + + + diff --git a/packages/java/src/main/java/armonik/client/event/EventClient.java b/packages/java/src/main/java/armonik/client/event/EventClient.java new file mode 100644 index 000000000..c2c957e43 --- /dev/null +++ b/packages/java/src/main/java/armonik/client/event/EventClient.java @@ -0,0 +1,185 @@ +package armonik.client.event; + +import static armonik.api.grpc.v1.events.EventsCommon.EventsEnum.EVENTS_ENUM_NEW_RESULT; +import static armonik.api.grpc.v1.events.EventsCommon.EventsEnum.EVENTS_ENUM_RESULT_STATUS_UPDATE; +import static armonik.api.grpc.v1.results.ResultsFields.ResultRawEnumField.RESULT_RAW_ENUM_FIELD_RESULT_ID; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import armonik.api.grpc.v1.FiltersCommon; +import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionRequest; +import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse; +import armonik.api.grpc.v1.events.EventsGrpc; +import armonik.api.grpc.v1.events.EventsGrpc.EventsBlockingStub; +import armonik.api.grpc.v1.results.ResultsFields; +import armonik.api.grpc.v1.results.ResultsFilters; +import armonik.client.event.util.records.EventSubscriptionResponseRecord; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; + +/** + * EventClient is a client for interacting with event-related functionalities. + * It communicates with a gRPC server using a blocking stub to retrieve events. + */ +public class EventClient { + /** The blocking and nonblocking stub for communicating with the gRPC server. */ + private final EventsBlockingStub eventsBlockingStub; + private final EventsGrpc.EventsStub eventsStub; + + /** + * Constructs a new EventClient with the specified managed channel. + * + * @param managedChannel the managed channel used for communication with the + * server + */ + public EventClient(ManagedChannel managedChannel) { + eventsBlockingStub = EventsGrpc.newBlockingStub(managedChannel); + eventsStub = EventsGrpc.newStub(managedChannel); + } + + /** + * Retrieves a list of event subscription response records for the given session + * ID and result IDs. + * + * @param sessionId the session ID for which events are requested + * @param resultIds the list of result IDs for which events are requested + * @return a list of EventSubscriptionResponseRecord objects representing the + * events + */ + public List getEvents(String sessionId, List resultIds) { + EventSubscriptionRequest request = CreateEventSubscriptionRequest(sessionId, resultIds); + return mapToRecord(sessionId, request, resultIds); + } + + /** + * Maps the received event subscription response to + * EventSubscriptionResponseRecord objects. + * + * @param sessionId the session ID for which events are being mapped + * @param request the event subscription request + * @return a list of EventSubscriptionResponseRecord objects representing the + * events + */ + private List mapToRecord(String sessionId, EventSubscriptionRequest request, + List resultIds) { + List responseRecords = new ArrayList<>(); + Iterator events = eventsBlockingStub.getEvents(request); + Set resultsExpected = new HashSet<>(resultIds); + + while (events.hasNext()) { + var esr = events.next(); + resultsExpected.remove(esr.getNewResult().getResultId()); + responseRecords + .add(new EventSubscriptionResponseRecord(sessionId, + esr.getTaskStatusUpdate(), + esr.getResultStatusUpdate(), + esr.getResultOwnerUpdate(), + esr.getNewTask(), + esr.getNewResult())); + if (resultsExpected.isEmpty()) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + System.out.println("Thread was interrupted while sleeping"); + } + break; + } + } + return responseRecords; + } + + /** + * Retrieves a list of event subscription response records for the given session + * asynchrone + * ID and result IDs. + * + * @param sessionId the session ID for which events are requested + * @param resultIds the list of result IDs for which events are requested + * @return a list of EventSubscriptionResponseRecord objects representing the + * events + * @throws InterruptedException + */ + public List getEventResponseRecords(String sessionId, List resultIds) + throws InterruptedException { + + EventSubscriptionRequest request = CreateEventSubscriptionRequest(sessionId, resultIds); + List responseRecords = new ArrayList<>(); + CountDownLatch finishLatch = new CountDownLatch(1); + + StreamObserver responseObserver = new StreamObserver() { + + @Override + public void onNext(EventSubscriptionResponse esr) { + responseRecords.add(new EventSubscriptionResponseRecord( + sessionId, + esr.getTaskStatusUpdate(), + esr.getResultStatusUpdate(), + esr.getResultOwnerUpdate(), + esr.getNewTask(), + esr.getNewResult())); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + finishLatch.countDown(); + } + + @Override + public void onCompleted() { + System.out.println("Stream completed"); + finishLatch.countDown(); + } + }; + + eventsStub.getEvents(request, responseObserver); + + // Wait for the response observer to finish + if (!finishLatch.await(1, TimeUnit.MINUTES)) { + System.out.println("Request not completed within the timeout."); + } + + return responseRecords; + } + + /** + * Creates an event subscription request with the specified session ID and + * result IDs. + * + * @param sessionId the session ID for which event subscription is requested + * @param resultIds the list of result IDs to filter events + * @return an EventSubscriptionRequest object configured with the provided + * session ID and result IDs + */ + public static EventSubscriptionRequest CreateEventSubscriptionRequest(String sessionId, List resultIds) { + FiltersCommon.FilterString filterString = FiltersCommon.FilterString.newBuilder() + .setOperator(FiltersCommon.FilterStringOperator.FILTER_STRING_OPERATOR_EQUAL) + .build(); + + ResultsFields.ResultField.Builder resultField = ResultsFields.ResultField.newBuilder() + .setResultRawField(ResultsFields.ResultRawField.newBuilder().setField(RESULT_RAW_ENUM_FIELD_RESULT_ID)); + + ResultsFilters.FilterField.Builder filterFieldBuilder = ResultsFilters.FilterField.newBuilder() + .setField(resultField) + .setFilterString(filterString); + + ResultsFilters.Filters.Builder resultFiltersBuilder = ResultsFilters.Filters.newBuilder(); + for (String resultId : resultIds) { + filterFieldBuilder.setFilterString(FiltersCommon.FilterString.newBuilder().setValue(resultId).build()); + resultFiltersBuilder.addOr(ResultsFilters.FiltersAnd.newBuilder().addAnd(filterFieldBuilder).build()); + } + + return EventSubscriptionRequest.newBuilder() + .setResultsFilters(resultFiltersBuilder.build()) + .addReturnedEvents(EVENTS_ENUM_RESULT_STATUS_UPDATE) + .addReturnedEvents(EVENTS_ENUM_NEW_RESULT) + .setSessionId(sessionId) + .build(); + } +} diff --git a/packages/java/src/main/java/armonik/client/event/util/records/EventSubscriptionResponseRecord.java b/packages/java/src/main/java/armonik/client/event/util/records/EventSubscriptionResponseRecord.java new file mode 100644 index 000000000..0cead396d --- /dev/null +++ b/packages/java/src/main/java/armonik/client/event/util/records/EventSubscriptionResponseRecord.java @@ -0,0 +1,22 @@ +package armonik.client.event.util.records; + +import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.NewResult; +import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.NewTask; +import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.ResultOwnerUpdate; +import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.ResultStatusUpdate; +import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.TaskStatusUpdate; + +/** + * EventSubscriptionResponseRecord represents a record containing subscription + * response details for an event. + * It encapsulates various attributes related to event subscription, such as + * session ID, task status update, + * result status update, result owner update, new task, and new result. + */ +public record EventSubscriptionResponseRecord(String sessionId, + TaskStatusUpdate taskStatusUpdate, + ResultStatusUpdate resultStatusUpdate, + ResultOwnerUpdate resultOwnerUpdate, + NewTask newTask, + NewResult newResult) { +} diff --git a/packages/java/src/main/java/armonik/client/result/ResultClient.java b/packages/java/src/main/java/armonik/client/result/ResultClient.java new file mode 100644 index 000000000..eb401344e --- /dev/null +++ b/packages/java/src/main/java/armonik/client/result/ResultClient.java @@ -0,0 +1,158 @@ +package armonik.client.result; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.google.protobuf.ByteString; + +import armonik.api.grpc.v1.Objects; +import armonik.api.grpc.v1.results.ResultsCommon.CreateResultsMetaDataRequest; +import armonik.api.grpc.v1.results.ResultsCommon.CreateResultsRequest; +import armonik.api.grpc.v1.results.ResultsCommon.DownloadResultDataRequest; +import armonik.api.grpc.v1.results.ResultsCommon.DownloadResultDataResponse; +import armonik.api.grpc.v1.results.ResultsCommon.GetOwnerTaskIdRequest; +import armonik.api.grpc.v1.results.ResultsCommon.GetOwnerTaskIdResponse.MapResultTask; +import armonik.api.grpc.v1.results.ResultsCommon.GetResultRequest; +import armonik.api.grpc.v1.results.ResultsCommon.ListResultsRequest; +import armonik.api.grpc.v1.results.ResultsCommon.ListResultsRequest.Sort; +import armonik.api.grpc.v1.results.ResultsCommon.ResultRaw; +import armonik.api.grpc.v1.results.ResultsFilters.Filters; +import armonik.api.grpc.v1.results.ResultsGrpc; +import io.grpc.ManagedChannel; + +/** + * ResultClient provides methods for interacting with result-related + * functionalities. + * It communicates with a gRPC server using a blocking stub to perform various + * operations on results. + */ +public class ResultClient { + /** The blocking stub for communicating with the gRPC server. */ + private final ResultsGrpc.ResultsBlockingStub resultsBlockingStub; + + /** + * Constructs a new ResultClient with the specified managed channel. + * + * @param managedChannel the managed channel used for communication with the + * server + */ + public ResultClient(ManagedChannel managedChannel) { + this.resultsBlockingStub = ResultsGrpc.newBlockingStub(managedChannel); + } + + /** + * Retrieves the service configuration data chunk max size from the server. + * + * @return the service configuration data chunk max size + */ + public int getServiceConfiguration() { + return resultsBlockingStub.getServiceConfiguration(Objects.Empty.newBuilder().build()).getDataChunkMaxSize(); + } + + /** + * Downloads result data for the specified session ID and result ID. + * + * @param sessionId the session ID associated with the result to download + * @param resultId the result ID to download + * @return a list of byte arrays representing the downloaded data chunks + */ + public List downloadResultData(String sessionId, String resultId) { + DownloadResultDataRequest request = DownloadResultDataRequest.newBuilder() + .setSessionId(sessionId) + .setResultId(resultId) + .build(); + Iterator iterator = resultsBlockingStub.downloadResultData(request); + List list = new ArrayList<>(); + + iterator.forEachRemaining(list::add); + + return list.stream() + .map(DownloadResultDataResponse::getDataChunk) + .map(ByteString::toByteArray) + .toList(); + } + + /** + * Creates results based on the specified request. + * + * @param request the request containing the data to create results + * @return a list of ResultRaw objects representing the created results + */ + public List createResults(CreateResultsRequest request) { + return resultsBlockingStub.createResults(request).getResultsList(); + + } + + /** + * Creates metadata for results associated with the specified session ID and + * names. + * + * @param sessionId the session ID for which metadata is being created + * @param names the list of names for the results + * @return a list of ResultRaw objects representing the created metadata + */ + public List createResultsMetaData(String sessionId, List names) { + CreateResultsMetaDataRequest request = CreateResultsMetaDataRequest.newBuilder() + .setSessionId(sessionId) + .addAllResults(names.stream() + .map(name -> CreateResultsMetaDataRequest.ResultCreate.newBuilder().setName(name).build()).toList()) + .build(); + + return resultsBlockingStub.createResultsMetaData(request).getResultsList(); + } + + /** + * Retrieves a map of result IDs to task IDs for the specified session ID and + * result IDs. + * + * @param sessionId the session ID associated with the results + * @param resultIds the list of result IDs for which task IDs are requested + * @return a map where result IDs are mapped to their corresponding task IDs + */ + public Map getOwnerTaskId(String sessionId, List resultIds) { + GetOwnerTaskIdRequest request = GetOwnerTaskIdRequest.newBuilder() + .setSessionId(sessionId) + .addAllResultId(resultIds) + .build(); + return resultsBlockingStub.getOwnerTaskId(request).getResultTaskList() + .stream() + .collect(Collectors.toMap(MapResultTask::getResultId, MapResultTask::getTaskId)); + } + + /** + * Retrieves the result with the specified result ID. + * + * @param resultId the ID of the result to retrieve + * @return the ResultRaw object representing the retrieved result + */ + public ResultRaw getResult(String resultId) { + GetResultRequest request = GetResultRequest.newBuilder() + .setResultId(resultId) + .build(); + return resultsBlockingStub.getResult(request).getResult(); + } + + /** + * Lists results based on the specified filters, total count, pagination + * parameters, and sorting criteria. + * + * @param filters the filters to apply to the result list + * @param total the total count of results + * @param page the page number of the results to retrieve + * @param pageSize the size of each page of results + * @param sort the sorting criteria for the results + * @return a list of ResultRaw objects representing the retrieved results + */ + public List listResults(Filters filters, int total, int page, int pageSize, Sort sort) { + ListResultsRequest request = ListResultsRequest.newBuilder() + .setFilters(filters) + .setSort(sort) + .setPage(page) + .setPageSize(pageSize) + .build(); + return resultsBlockingStub.listResults(request).getResultsList(); + } +} diff --git a/packages/java/src/main/resources/logback.xml b/packages/java/src/main/resources/logback.xml new file mode 100644 index 000000000..0ec161833 --- /dev/null +++ b/packages/java/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + +