diff --git a/src/main/java/de/tum/cit/aet/artemis/core/service/LLMTokenUsageService.java b/src/main/java/de/tum/cit/aet/artemis/core/service/LLMTokenUsageService.java index 5ffe5f379ff5..c3dc2af1e519 100644 --- a/src/main/java/de/tum/cit/aet/artemis/core/service/LLMTokenUsageService.java +++ b/src/main/java/de/tum/cit/aet/artemis/core/service/LLMTokenUsageService.java @@ -81,6 +81,16 @@ public void appendRequestsToTrace(List requests, LLMTokenUsageTrace llmTokenUsageRequestRepository.saveAll(requestSet); } + /** + * Finds an LLMTokenUsageTrace by its ID. + * + * @param id The ID of the LLMTokenUsageTrace to find. + * @return An Optional containing the LLMTokenUsageTrace if found, or an empty Optional otherwise. + */ + public Optional findLLMTokenUsageTraceById(Long id) { + return llmTokenUsageTraceRepository.findById(id); + } + /** * Class LLMTokenUsageBuilder to be used for saveLLMTokenUsage() */ diff --git a/src/main/java/de/tum/cit/aet/artemis/iris/service/IrisCompetencyGenerationService.java b/src/main/java/de/tum/cit/aet/artemis/iris/service/IrisCompetencyGenerationService.java index f8d2a0201198..49e08cae1fd7 100644 --- a/src/main/java/de/tum/cit/aet/artemis/iris/service/IrisCompetencyGenerationService.java +++ b/src/main/java/de/tum/cit/aet/artemis/iris/service/IrisCompetencyGenerationService.java @@ -75,7 +75,7 @@ public void executeCompetencyExtractionPipeline(User user, Course course, String * @param job Job related to the status update * @param statusUpdate the status update containing the new competency recommendations */ - public void handleStatusUpdate(CompetencyExtractionJob job, PyrisCompetencyStatusUpdateDTO statusUpdate) { + public CompetencyExtractionJob handleStatusUpdate(CompetencyExtractionJob job, PyrisCompetencyStatusUpdateDTO statusUpdate) { Course course = courseRepository.findByIdForUpdateElseThrow(job.courseId()); if (statusUpdate.tokens() != null && !statusUpdate.tokens().isEmpty()) { llmTokenUsageService.saveLLMTokenUsage(statusUpdate.tokens(), LLMServiceType.IRIS, builder -> builder.withCourse(course.getId()).withUser(job.userId())); @@ -83,6 +83,8 @@ public void handleStatusUpdate(CompetencyExtractionJob job, PyrisCompetencyStatu var user = userRepository.findById(job.userId()).orElseThrow(); websocketService.send(user.getLogin(), websocketTopic(job.courseId()), statusUpdate); + + return job; } private static String websocketTopic(long courseId) { diff --git a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/PyrisJobService.java b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/PyrisJobService.java index 7933e9e20920..16e8969bc463 100644 --- a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/PyrisJobService.java +++ b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/PyrisJobService.java @@ -78,14 +78,14 @@ public String createTokenForJob(Function tokenToJobFunction) { public String addExerciseChatJob(Long courseId, Long exerciseId, Long sessionId) { var token = generateJobIdToken(); - var job = new ExerciseChatJob(token, courseId, exerciseId, sessionId); + var job = new ExerciseChatJob(token, courseId, exerciseId, sessionId, null); jobMap.put(token, job); return token; } public String addCourseChatJob(Long courseId, Long sessionId) { var token = generateJobIdToken(); - var job = new CourseChatJob(token, courseId, sessionId); + var job = new CourseChatJob(token, courseId, sessionId, null); jobMap.put(token, job); return token; } @@ -107,10 +107,19 @@ public String addIngestionWebhookJob() { /** * Remove a job from the job map. * - * @param token the token + * @param job the job to remove + */ + public void removeJob(PyrisJob job) { + jobMap.remove(job.jobId()); + } + + /** + * Store a job in the job map. + * + * @param job the job to store */ - public void removeJob(String token) { - jobMap.remove(token); + public void updateJob(PyrisJob job) { + jobMap.put(job.jobId(), job); } /** diff --git a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/PyrisStatusUpdateService.java b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/PyrisStatusUpdateService.java index 1526311fe8c7..cdd398e5c683 100644 --- a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/PyrisStatusUpdateService.java +++ b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/PyrisStatusUpdateService.java @@ -20,7 +20,9 @@ import de.tum.cit.aet.artemis.iris.service.pyris.job.CourseChatJob; import de.tum.cit.aet.artemis.iris.service.pyris.job.ExerciseChatJob; import de.tum.cit.aet.artemis.iris.service.pyris.job.IngestionWebhookJob; +import de.tum.cit.aet.artemis.iris.service.pyris.job.PyrisJob; import de.tum.cit.aet.artemis.iris.service.pyris.job.TextExerciseChatJob; +import de.tum.cit.aet.artemis.iris.service.pyris.job.TrackedSessionBasedPyrisJob; import de.tum.cit.aet.artemis.iris.service.session.IrisCourseChatSessionService; import de.tum.cit.aet.artemis.iris.service.session.IrisExerciseChatSessionService; import de.tum.cit.aet.artemis.iris.service.session.IrisTextExerciseChatSessionService; @@ -52,15 +54,16 @@ public PyrisStatusUpdateService(PyrisJobService pyrisJobService, IrisExerciseCha } /** - * Handles the status update of a exercise chat job and forwards it to {@link IrisExerciseChatSessionService#handleStatusUpdate(ExerciseChatJob, PyrisChatStatusUpdateDTO)} + * Handles the status update of a exercise chat job and forwards it to + * {@link IrisExerciseChatSessionService#handleStatusUpdate(TrackedSessionBasedPyrisJob, PyrisChatStatusUpdateDTO)} * * @param job the job that is updated * @param statusUpdate the status update */ public void handleStatusUpdate(ExerciseChatJob job, PyrisChatStatusUpdateDTO statusUpdate) { - irisExerciseChatSessionService.handleStatusUpdate(job, statusUpdate); + var updatedJob = irisExerciseChatSessionService.handleStatusUpdate(job, statusUpdate); - removeJobIfTerminated(statusUpdate.stages(), job.jobId()); + removeJobIfTerminatedElseUpdate(statusUpdate.stages(), updatedJob); } /** @@ -71,22 +74,22 @@ public void handleStatusUpdate(ExerciseChatJob job, PyrisChatStatusUpdateDTO sta * @param statusUpdate the status update */ public void handleStatusUpdate(TextExerciseChatJob job, PyrisTextExerciseChatStatusUpdateDTO statusUpdate) { - irisTextExerciseChatSessionService.handleStatusUpdate(job, statusUpdate); + var updatedJob = irisTextExerciseChatSessionService.handleStatusUpdate(job, statusUpdate); - removeJobIfTerminated(statusUpdate.stages(), job.jobId()); + removeJobIfTerminatedElseUpdate(statusUpdate.stages(), updatedJob); } /** * Handles the status update of a course chat job and forwards it to - * {@link de.tum.cit.aet.artemis.iris.service.session.IrisCourseChatSessionService#handleStatusUpdate(CourseChatJob, PyrisChatStatusUpdateDTO)} + * {@link de.tum.cit.aet.artemis.iris.service.session.IrisCourseChatSessionService#handleStatusUpdate(TrackedSessionBasedPyrisJob, PyrisChatStatusUpdateDTO)} * * @param job the job that is updated * @param statusUpdate the status update */ public void handleStatusUpdate(CourseChatJob job, PyrisChatStatusUpdateDTO statusUpdate) { - courseChatSessionService.handleStatusUpdate(job, statusUpdate); + var updatedJob = courseChatSessionService.handleStatusUpdate(job, statusUpdate); - removeJobIfTerminated(statusUpdate.stages(), job.jobId()); + removeJobIfTerminatedElseUpdate(statusUpdate.stages(), updatedJob); } /** @@ -97,26 +100,29 @@ public void handleStatusUpdate(CourseChatJob job, PyrisChatStatusUpdateDTO statu * @param statusUpdate the status update */ public void handleStatusUpdate(CompetencyExtractionJob job, PyrisCompetencyStatusUpdateDTO statusUpdate) { - competencyGenerationService.handleStatusUpdate(job, statusUpdate); + var updatedJob = competencyGenerationService.handleStatusUpdate(job, statusUpdate); - removeJobIfTerminated(statusUpdate.stages(), job.jobId()); + removeJobIfTerminatedElseUpdate(statusUpdate.stages(), updatedJob); } /** - * Removes the job from the job service if the status update indicates that the job is terminated. - * This is the case if all stages are in a terminal state. + * Removes the job from the job service if the status update indicates that the job is terminated; updates it to distribute changes otherwise. + * A job is terminated if all stages are in a terminal state. *

* * @see PyrisStageState#isTerminal() * * @param stages the stages of the status update - * @param job the job to remove + * @param job the job to remove or to update */ - private void removeJobIfTerminated(List stages, String job) { + private void removeJobIfTerminatedElseUpdate(List stages, PyrisJob job) { var isDone = stages.stream().map(PyrisStageDTO::state).allMatch(PyrisStageState::isTerminal); if (isDone) { pyrisJobService.removeJob(job); } + else { + pyrisJobService.updateJob(job); + } } /** @@ -128,6 +134,6 @@ private void removeJobIfTerminated(List stages, String job) { */ public void handleStatusUpdate(IngestionWebhookJob job, PyrisLectureIngestionStatusUpdateDTO statusUpdate) { statusUpdate.stages().forEach(stage -> log.info(stage.name() + ":" + stage.message())); - removeJobIfTerminated(statusUpdate.stages(), job.jobId()); + removeJobIfTerminatedElseUpdate(statusUpdate.stages(), job); } } diff --git a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/CourseChatJob.java b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/CourseChatJob.java index c05cbf9b94ea..2f389e22ed96 100644 --- a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/CourseChatJob.java +++ b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/CourseChatJob.java @@ -9,10 +9,15 @@ * This job is used to reference the details of a course chat session when Pyris sends a status update. */ @JsonInclude(JsonInclude.Include.NON_EMPTY) -public record CourseChatJob(String jobId, long courseId, long sessionId) implements SessionBasedPyrisJob { +public record CourseChatJob(String jobId, long courseId, long sessionId, Long traceId) implements TrackedSessionBasedPyrisJob { @Override public boolean canAccess(Course course) { return courseId == course.getId(); } + + @Override + public TrackedSessionBasedPyrisJob withTraceId(long traceId) { + return new CourseChatJob(jobId, courseId, sessionId, traceId); + } } diff --git a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/ExerciseChatJob.java b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/ExerciseChatJob.java index 1c2278cb2697..f74e7360be82 100644 --- a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/ExerciseChatJob.java +++ b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/ExerciseChatJob.java @@ -10,7 +10,7 @@ * This job is used to reference the details of a exercise chat session when Pyris sends a status update. */ @JsonInclude(JsonInclude.Include.NON_EMPTY) -public record ExerciseChatJob(String jobId, long courseId, long exerciseId, long sessionId) implements SessionBasedPyrisJob { +public record ExerciseChatJob(String jobId, long courseId, long exerciseId, long sessionId, Long traceId) implements TrackedSessionBasedPyrisJob { @Override public boolean canAccess(Course course) { @@ -21,4 +21,9 @@ public boolean canAccess(Course course) { public boolean canAccess(Exercise exercise) { return exercise.getId().equals(exerciseId); } + + @Override + public TrackedSessionBasedPyrisJob withTraceId(long traceId) { + return new ExerciseChatJob(jobId, courseId, exerciseId, sessionId, traceId); + } } diff --git a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/SessionBasedPyrisJob.java b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/SessionBasedPyrisJob.java deleted file mode 100644 index 03c2e4007838..000000000000 --- a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/SessionBasedPyrisJob.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.tum.cit.aet.artemis.iris.service.pyris.job; - -/** - * An interface Pyris job that is associated with a session. - */ -public interface SessionBasedPyrisJob extends PyrisJob { - - long sessionId(); -} diff --git a/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/TrackedSessionBasedPyrisJob.java b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/TrackedSessionBasedPyrisJob.java new file mode 100644 index 000000000000..bdd180103840 --- /dev/null +++ b/src/main/java/de/tum/cit/aet/artemis/iris/service/pyris/job/TrackedSessionBasedPyrisJob.java @@ -0,0 +1,14 @@ +package de.tum.cit.aet.artemis.iris.service.pyris.job; + +/** + * A Pyris job that has a session id and stored its own LLM usage tracing ID. + * This is used for chat jobs where we need to reference the trace ID later after chat suggestions have been generated. + */ +public interface TrackedSessionBasedPyrisJob extends PyrisJob { + + long sessionId(); + + Long traceId(); + + TrackedSessionBasedPyrisJob withTraceId(long traceId); +} diff --git a/src/main/java/de/tum/cit/aet/artemis/iris/service/session/AbstractIrisChatSessionService.java b/src/main/java/de/tum/cit/aet/artemis/iris/service/session/AbstractIrisChatSessionService.java index 16df99a68337..61198308d519 100644 --- a/src/main/java/de/tum/cit/aet/artemis/iris/service/session/AbstractIrisChatSessionService.java +++ b/src/main/java/de/tum/cit/aet/artemis/iris/service/session/AbstractIrisChatSessionService.java @@ -1,13 +1,12 @@ package de.tum.cit.aet.artemis.iris.service.session; -import java.util.HashMap; import java.util.List; +import java.util.Optional; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import de.tum.cit.aet.artemis.core.domain.LLMServiceType; -import de.tum.cit.aet.artemis.core.domain.LLMTokenUsageTrace; import de.tum.cit.aet.artemis.core.service.LLMTokenUsageService; import de.tum.cit.aet.artemis.iris.domain.message.IrisMessage; import de.tum.cit.aet.artemis.iris.domain.message.IrisMessageSender; @@ -16,7 +15,7 @@ import de.tum.cit.aet.artemis.iris.repository.IrisSessionRepository; import de.tum.cit.aet.artemis.iris.service.IrisMessageService; import de.tum.cit.aet.artemis.iris.service.pyris.dto.chat.PyrisChatStatusUpdateDTO; -import de.tum.cit.aet.artemis.iris.service.pyris.job.SessionBasedPyrisJob; +import de.tum.cit.aet.artemis.iris.service.pyris.job.TrackedSessionBasedPyrisJob; import de.tum.cit.aet.artemis.iris.service.websocket.IrisChatWebsocketService; public abstract class AbstractIrisChatSessionService implements IrisChatBasedFeatureInterface, IrisRateLimitedFeatureInterface { @@ -31,8 +30,6 @@ public abstract class AbstractIrisChatSessionService private final ObjectMapper objectMapper; - protected final HashMap traces = new HashMap<>(); - public AbstractIrisChatSessionService(IrisSessionRepository irisSessionRepository, ObjectMapper objectMapper, IrisMessageService irisMessageService, IrisChatWebsocketService irisChatWebsocketService, LLMTokenUsageService llmTokenUsageService) { this.irisSessionRepository = irisSessionRepository; @@ -69,8 +66,9 @@ protected void updateLatestSuggestions(S session, List latestSuggestions * * @param job The job that was executed * @param statusUpdate The status update of the job + * @return the same job record or a new job record with the same job id if changes were made */ - public void handleStatusUpdate(SessionBasedPyrisJob job, PyrisChatStatusUpdateDTO statusUpdate) { + public TrackedSessionBasedPyrisJob handleStatusUpdate(TrackedSessionBasedPyrisJob job, PyrisChatStatusUpdateDTO statusUpdate) { var session = (S) irisSessionRepository.findByIdWithMessagesAndContents(job.sessionId()); IrisMessage savedMessage; if (statusUpdate.result() != null) { @@ -84,6 +82,7 @@ public void handleStatusUpdate(SessionBasedPyrisJob job, PyrisChatStatusUpdateDT irisChatWebsocketService.sendStatusUpdate(session, statusUpdate.stages(), statusUpdate.suggestions(), statusUpdate.tokens()); } + TrackedSessionBasedPyrisJob updatedJob = job; if (statusUpdate.tokens() != null && !statusUpdate.tokens().isEmpty()) { if (savedMessage != null) { // generated message is first sent and generated trace is saved @@ -92,27 +91,27 @@ public void handleStatusUpdate(SessionBasedPyrisJob job, PyrisChatStatusUpdateDT this.setLLMTokenUsageParameters(builder, session); return builder; }); - traces.put(job.jobId(), llmTokenUsageTrace); + + updatedJob = job.withTraceId(llmTokenUsageTrace.getId()); } else { - // interaction suggestion is sent and appended to the generated trace if it exists, trace is then removed, - // because interaction suggestion is the last message from Iris in the pipeline - if (traces.containsKey(job.jobId())) { - var trace = traces.get(job.jobId()); + // interaction suggestion is sent and appended to the generated trace if it exists + Optional.ofNullable(job.traceId()).flatMap(llmTokenUsageService::findLLMTokenUsageTraceById).ifPresentOrElse(trace -> { llmTokenUsageService.appendRequestsToTrace(statusUpdate.tokens(), trace); - traces.remove(job.jobId()); - } - else { + irisSessionRepository.save(session); + }, () -> { llmTokenUsageService.saveLLMTokenUsage(statusUpdate.tokens(), LLMServiceType.IRIS, builder -> { builder.withUser(session.getUser().getId()); this.setLLMTokenUsageParameters(builder, session); return builder; }); - } + }); } } updateLatestSuggestions(session, statusUpdate.suggestions()); + + return updatedJob; } protected abstract void setLLMTokenUsageParameters(LLMTokenUsageService.LLMTokenUsageBuilder builder, S session); diff --git a/src/main/java/de/tum/cit/aet/artemis/iris/service/session/IrisTextExerciseChatSessionService.java b/src/main/java/de/tum/cit/aet/artemis/iris/service/session/IrisTextExerciseChatSessionService.java index 4520417aad48..5682580ccb89 100644 --- a/src/main/java/de/tum/cit/aet/artemis/iris/service/session/IrisTextExerciseChatSessionService.java +++ b/src/main/java/de/tum/cit/aet/artemis/iris/service/session/IrisTextExerciseChatSessionService.java @@ -116,7 +116,8 @@ public void requestAndHandleResponse(IrisTextExerciseChatSession irisSession) { * @param job The job that is updated * @param statusUpdate The status update */ - public void handleStatusUpdate(TextExerciseChatJob job, PyrisTextExerciseChatStatusUpdateDTO statusUpdate) { + public TextExerciseChatJob handleStatusUpdate(TextExerciseChatJob job, PyrisTextExerciseChatStatusUpdateDTO statusUpdate) { + // TODO: LLM Token Tracking - or better, make this class a subclass of AbstractIrisChatSessionService var session = (IrisTextExerciseChatSession) irisSessionRepository.findByIdElseThrow(job.sessionId()); if (statusUpdate.result() != null) { var message = session.newMessage(); @@ -127,6 +128,8 @@ public void handleStatusUpdate(TextExerciseChatJob job, PyrisTextExerciseChatSta else { irisChatWebsocketService.sendMessage(session, null, statusUpdate.stages()); } + + return job; } @Override