diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/LogCriteria.java b/apps/opik-backend/src/main/java/com/comet/opik/api/LogCriteria.java new file mode 100644 index 0000000000..d8e8c8fcc3 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/LogCriteria.java @@ -0,0 +1,18 @@ +package com.comet.opik.api; + +import lombok.Builder; +import lombok.NonNull; + +import java.util.Map; +import java.util.UUID; + +import static com.comet.opik.api.LogItem.LogLevel; + +@Builder +public record LogCriteria( + @NonNull String workspaceId, + UUID entityId, + LogLevel level, + Integer size, + Map markers) { +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/LogItem.java b/apps/opik-backend/src/main/java/com/comet/opik/api/LogItem.java new file mode 100644 index 0000000000..ec9ed59a88 --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/LogItem.java @@ -0,0 +1,43 @@ +package com.comet.opik.api; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Builder; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +public record LogItem( + @Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant timestamp, + @JsonIgnore String workspaceId, + @Schema(accessMode = Schema.AccessMode.READ_ONLY) UUID ruleId, + @Schema(accessMode = Schema.AccessMode.READ_ONLY) LogLevel level, + @Schema(accessMode = Schema.AccessMode.READ_ONLY) String message, + @Schema(accessMode = Schema.AccessMode.READ_ONLY) Map markers) { + + public enum LogLevel { + INFO, + WARN, + ERROR, + DEBUG, + TRACE + } + + @Builder + @JsonIgnoreProperties(ignoreUnknown = true) + @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) + public record LogPage(List content, int page, int size, long total) implements Page { + + public static LogPage empty(int page) { + return new LogPage(List.of(), page, 0, 0); + } + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/AutomationRuleEvaluatorsResource.java b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/AutomationRuleEvaluatorsResource.java index 4079a97195..01ab6333f1 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/AutomationRuleEvaluatorsResource.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/priv/AutomationRuleEvaluatorsResource.java @@ -4,6 +4,7 @@ import com.comet.opik.api.AutomationRuleEvaluator; import com.comet.opik.api.AutomationRuleEvaluatorUpdate; import com.comet.opik.api.BatchDelete; +import com.comet.opik.api.LogCriteria; import com.comet.opik.api.Page; import com.comet.opik.domain.AutomationRuleEvaluatorService; import com.comet.opik.infrastructure.auth.RequestContext; @@ -43,6 +44,7 @@ import static com.comet.opik.api.AutomationRuleEvaluator.AutomationRuleEvaluatorPage; import static com.comet.opik.api.AutomationRuleEvaluator.View; +import static com.comet.opik.api.LogItem.LogPage; @Path("/v1/private/automations/projects/{projectId}/evaluators/") @Produces(MediaType.APPLICATION_JSON) @@ -164,4 +166,23 @@ public Response deleteEvaluators( return Response.noContent().build(); } + @GET + @Path("/{id}/logs") + @Operation(operationId = "getEvaluatorLogsById", summary = "Get automation rule evaluator logs by id", description = "Get automation rule evaluator logs by id", responses = { + @ApiResponse(responseCode = "200", description = "Automation rule evaluator logs resource", content = @Content(schema = @Schema(implementation = LogPage.class))) + }) + public Response getLogs(@PathParam("projectId") UUID projectId, @PathParam("id") UUID evaluatorId, + @QueryParam("size") @Min(1) @DefaultValue("1000") int size) { + String workspaceId = requestContext.get().getWorkspaceId(); + + log.info("Looking for logs for automated evaluator: id '{}' on project_id '{}' and workspace_id '{}'", + evaluatorId, projectId, workspaceId); + var criteria = LogCriteria.builder().workspaceId(workspaceId).entityId(evaluatorId).size(size).build(); + LogPage logs = service.getLogs(criteria).block(); + log.info("Found {} logs for automated evaluator: id '{}' on project_id '{}'and workspace_id '{}'", logs.size(), + evaluatorId, projectId, workspaceId); + + return Response.ok().entity(logs).build(); + } + } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorLogsDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorLogsDAO.java new file mode 100644 index 0000000000..c4e77a748b --- /dev/null +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorLogsDAO.java @@ -0,0 +1,182 @@ +package com.comet.opik.domain; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import com.comet.opik.api.LogCriteria; +import com.comet.opik.api.LogItem; +import com.comet.opik.utils.TemplateUtils; +import com.google.inject.ImplementedBy; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Row; +import io.r2dbc.spi.Statement; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.stringtemplate.v4.ST; +import reactor.core.publisher.Mono; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static com.comet.opik.api.LogItem.LogLevel; +import static com.comet.opik.api.LogItem.LogPage; +import static com.comet.opik.infrastructure.log.tables.UserLogTableFactory.UserLogTableDAO; +import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder; + +@ImplementedBy(AutomationRuleEvaluatorLogsDAOImpl.class) +public interface AutomationRuleEvaluatorLogsDAO extends UserLogTableDAO { + + static AutomationRuleEvaluatorLogsDAO create(ConnectionFactory factory) { + return new AutomationRuleEvaluatorLogsDAOImpl(factory); + } + + Mono findLogs(LogCriteria criteria); + +} + +@Slf4j +@Singleton +@RequiredArgsConstructor(onConstructor_ = @Inject) +class AutomationRuleEvaluatorLogsDAOImpl implements AutomationRuleEvaluatorLogsDAO { + + private static final String INSERT_STATEMENT = """ + INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers) + VALUES , 9), + :level, + :workspace_id, + :rule_id, + :message, + mapFromArrays(:marker_keys, :marker_values) + ) + , + }> + ; + + """; + + public static final String FIND_ALL = """ + + SELECT * FROM automation_rule_evaluator_logs + + WHERE workspace_id = :workspace_id + + AND level = :level + + AND rule_id = :rule_id + + ORDER BY timestamp DESC + + LIMIT :limit OFFSET :offset + + """; + + private final @NonNull ConnectionFactory connectionFactory; + + public Mono findLogs(@NonNull LogCriteria criteria) { + return Mono.from(connectionFactory.create()) + .flatMapMany(connection -> { + + log.info("Finding logs with criteria: {}", criteria); + + var template = new ST(FIND_ALL); + + bindTemplateParameters(criteria, template); + + Statement statement = connection.createStatement(template.render()); + + bindParameters(criteria, statement); + + return statement.execute(); + + }) + .flatMap(result -> result.map((row, rowMetadata) -> mapRow(row))) + .collectList() + .map(this::mapPage); + } + + private LogPage mapPage(List logs) { + return LogPage.builder() + .content(logs) + .page(1) + .total(logs.size()) + .size(logs.size()) + .build(); + } + + private LogItem mapRow(Row row) { + return LogItem.builder() + .timestamp(row.get("timestamp", Instant.class)) + .level(LogLevel.valueOf(row.get("level", String.class))) + .workspaceId(row.get("workspace_id", String.class)) + .ruleId(row.get("rule_id", UUID.class)) + .message(row.get("message", String.class)) + .markers(row.get("markers", Map.class)) + .build(); + } + + private void bindTemplateParameters(LogCriteria criteria, ST template) { + Optional.ofNullable(criteria.level()).ifPresent(level -> template.add("level", level)); + Optional.ofNullable(criteria.entityId()).ifPresent(ruleId -> template.add("ruleId", ruleId)); + Optional.ofNullable(criteria.size()).ifPresent(limit -> template.add("limit", limit)); + } + + private void bindParameters(LogCriteria criteria, Statement statement) { + statement.bind("workspace_id", criteria.workspaceId()); + Optional.ofNullable(criteria.level()).ifPresent(level -> statement.bind("level", level)); + Optional.ofNullable(criteria.entityId()).ifPresent(ruleId -> statement.bind("rule_id", ruleId)); + Optional.ofNullable(criteria.size()).ifPresent(limit -> statement.bind("limit", limit)); + } + + @Override + public Mono saveAll(@NonNull List events) { + + return Mono.from(connectionFactory.create()) + .flatMapMany(connection -> { + var template = new ST(INSERT_STATEMENT); + + List queryItems = getQueryItemPlaceHolder(events.size()); + + template.add("items", queryItems); + + Statement statement = connection.createStatement(template.render()); + + for (int i = 0; i < events.size(); i++) { + + ILoggingEvent event = events.get(i); + String logLevel = event.getLevel().toString(); + String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id")) + .orElseThrow(() -> failWithMessage("workspace_id is not set")); + String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id")) + .orElseThrow(() -> failWithMessage("trace_id is not set")); + String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id")) + .orElseThrow(() -> failWithMessage("rule_id is not set")); + + statement + .bind("timestamp" + i, event.getInstant().toString()) + .bind("level" + i, logLevel) + .bind("workspace_id" + i, workspaceId) + .bind("rule_id" + i, ruleId) + .bind("message" + i, event.getFormattedMessage()) + .bind("marker_keys" + i, new String[]{"trace_id"}) + .bind("marker_values" + i, new String[]{traceId}); + } + + return statement.execute(); + + }) + .collectList() + .then(); + + } + + private IllegalStateException failWithMessage(String message) { + log.error(message); + return new IllegalStateException(message); + } +} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorService.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorService.java index 3335750068..b658e2084c 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorService.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/AutomationRuleEvaluatorService.java @@ -6,6 +6,7 @@ import com.comet.opik.api.AutomationRuleEvaluatorLlmAsJudge; import com.comet.opik.api.AutomationRuleEvaluatorType; import com.comet.opik.api.AutomationRuleEvaluatorUpdate; +import com.comet.opik.api.LogCriteria; import com.comet.opik.api.error.EntityAlreadyExistsException; import com.comet.opik.api.error.ErrorMessage; import com.comet.opik.infrastructure.cache.CacheEvict; @@ -19,6 +20,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.core.statement.UnableToExecuteStatementException; +import reactor.core.publisher.Mono; import ru.vyarus.guicey.jdbi3.tx.TransactionTemplate; import java.sql.SQLIntegrityConstraintViolationException; @@ -28,6 +30,7 @@ import java.util.UUID; import static com.comet.opik.api.AutomationRuleEvaluator.AutomationRuleEvaluatorPage; +import static com.comet.opik.api.LogItem.LogPage; import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY; import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE; @@ -50,6 +53,8 @@ AutomationRuleEvaluatorPage find(@NonNull UUID projectId, @NonNull String worksp List findAll(@NonNull UUID projectId, @NonNull String workspaceId, AutomationRuleEvaluatorType automationRuleEvaluatorType); + + Mono getLogs(LogCriteria criteria); } @Singleton @@ -61,6 +66,7 @@ class AutomationRuleEvaluatorServiceImpl implements AutomationRuleEvaluatorServi private final @NonNull IdGenerator idGenerator; private final @NonNull TransactionTemplate template; + private final @NonNull AutomationRuleEvaluatorLogsDAO logsDAO; @Override @CacheEvict(name = "automation_rule_evaluators_find_by_type", key = "$projectId +'-'+ $workspaceId +'-'+ $inputRuleEvaluator.type") @@ -250,4 +256,9 @@ public List findAll(@NonNull UUID projectId, }); } + @Override + public Mono getLogs(@NonNull LogCriteria criteria) { + return logsDAO.findLogs(criteria); + } + } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java index b24f3b4114..4507d7235e 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/ClickHouseAppender.java @@ -1,5 +1,6 @@ package com.comet.opik.infrastructure.log; +import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.AppenderBase; import com.comet.opik.domain.UserLog; @@ -17,6 +18,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.stream.Collectors.groupingBy; @@ -27,23 +29,20 @@ class ClickHouseAppender extends AppenderBase { private static ClickHouseAppender instance; - public static synchronized void init(@NonNull UserLogTableFactory userLogTableFactory, int batchSize, - @NonNull Duration flushIntervalDuration) { + public static synchronized ClickHouseAppender init(@NonNull UserLogTableFactory userLogTableFactory, int batchSize, + @NonNull Duration flushIntervalDuration, @NonNull LoggerContext context) { if (instance == null) { - setInstance(new ClickHouseAppender(userLogTableFactory, flushIntervalDuration, batchSize)); + ClickHouseAppender appender = new ClickHouseAppender(userLogTableFactory, flushIntervalDuration, batchSize); + setInstance(appender); + appender.setContext(context); instance.start(); } - } - public static synchronized ClickHouseAppender getInstance() { - if (instance == null) { - throw new IllegalStateException("ClickHouseAppender is not initialized"); - } return instance; } - private static synchronized void setInstance(ClickHouseAppender instance) { + private static void setInstance(ClickHouseAppender instance) { ClickHouseAppender.instance = instance; } @@ -52,17 +51,14 @@ private static synchronized void setInstance(ClickHouseAppender instance) { private final int batchSize; private volatile boolean running = true; - private BlockingQueue logQueue; - private ScheduledExecutorService scheduler; + private final BlockingQueue logQueue = new LinkedBlockingQueue<>(); + private final AtomicReference scheduler = new AtomicReference<>( + Executors.newSingleThreadScheduledExecutor()); @Override public void start() { - - logQueue = new LinkedBlockingQueue<>(); - scheduler = Executors.newSingleThreadScheduledExecutor(); - // Background flush thread - scheduler.scheduleAtFixedRate(this::flushLogs, flushIntervalDuration.toMillis(), + scheduler.get().scheduleAtFixedRate(this::flushLogs, flushIntervalDuration.toMillis(), flushIntervalDuration.toMillis(), TimeUnit.MILLISECONDS); super.start(); @@ -113,7 +109,7 @@ protected void append(ILoggingEvent event) { } if (logQueue.size() >= batchSize) { - scheduler.execute(this::flushLogs); + scheduler.get().execute(this::flushLogs); } } @@ -123,20 +119,23 @@ public void stop() { super.stop(); flushLogs(); setInstance(null); - scheduler.shutdown(); + scheduler.get().shutdown(); awaitTermination(); + logQueue.clear(); + scheduler.set(Executors.newSingleThreadScheduledExecutor()); } private void awaitTermination() { try { - if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { - scheduler.shutdownNow(); - if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { // Final attempt + if (!scheduler.get().awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.get().shutdownNow(); + if (!scheduler.get().awaitTermination(5, TimeUnit.SECONDS)) { // Final attempt log.error("ClickHouseAppender did not terminate"); } } } catch (InterruptedException ex) { - scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + scheduler.get().shutdownNow(); log.warn("ClickHouseAppender interrupted while waiting for termination", ex); } } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java index 57c23e0f93..9d62dd2a47 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java @@ -21,10 +21,8 @@ public static synchronized void init(@NonNull ConnectionFactory connectionFactor @NonNull Duration flushIntervalSeconds) { UserLogTableFactory tableFactory = UserLogTableFactory.getInstance(connectionFactory); - ClickHouseAppender.init(tableFactory, batchSize, flushIntervalSeconds); - - ClickHouseAppender clickHouseAppender = ClickHouseAppender.getInstance(); - clickHouseAppender.setContext(CONTEXT); + ClickHouseAppender clickHouseAppender = ClickHouseAppender.init(tableFactory, batchSize, flushIntervalSeconds, + CONTEXT); asyncAppender = new AsyncAppender(); asyncAppender.setContext(CONTEXT); diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java deleted file mode 100644 index 83f2a33307..0000000000 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/AutomationRuleEvaluatorLogDAO.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.comet.opik.infrastructure.log.tables; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import com.comet.opik.utils.TemplateUtils; -import io.r2dbc.spi.ConnectionFactory; -import io.r2dbc.spi.Statement; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.stringtemplate.v4.ST; -import reactor.core.publisher.Mono; - -import java.util.List; -import java.util.Optional; - -import static com.comet.opik.infrastructure.log.tables.UserLogTableFactory.UserLogTableDAO; -import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder; - -@RequiredArgsConstructor -@Slf4j -class AutomationRuleEvaluatorLogDAO implements UserLogTableDAO { - - private final ConnectionFactory factory; - - private static final String INSERT_STATEMENT = """ - INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers) - VALUES , 9), - :level, - :workspace_id, - :rule_id, - :message, - mapFromArrays(:marker_keys, :marker_values) - ) - , - }> - ; - """; - - @Override - public Mono saveAll(List events) { - return Mono.from(factory.create()) - .flatMapMany(connection -> { - var template = new ST(INSERT_STATEMENT); - List queryItems = getQueryItemPlaceHolder(events.size()); - - template.add("items", queryItems); - - Statement statement = connection.createStatement(template.render()); - - for (int i = 0; i < events.size(); i++) { - ILoggingEvent event = events.get(i); - - String logLevel = event.getLevel().toString(); - String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id")) - .orElseThrow(() -> failWithMessage("workspace_id is not set")); - String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id")) - .orElseThrow(() -> failWithMessage("trace_id is not set")); - String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id")) - .orElseThrow(() -> failWithMessage("rule_id is not set")); - - statement - .bind("timestamp" + i, event.getInstant().toString()) - .bind("level" + i, logLevel) - .bind("workspace_id" + i, workspaceId) - .bind("rule_id" + i, ruleId) - .bind("message" + i, event.getFormattedMessage()) - .bind("marker_keys" + i, new String[]{"trace_id"}) - .bind("marker_values" + i, new String[]{traceId}); - } - - return statement.execute(); - }) - .collectList() - .then(); - } - - private IllegalStateException failWithMessage(String message) { - log.error(message); - return new IllegalStateException(message); - } - -} diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java index 9bce54ce50..5900326a46 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java @@ -1,13 +1,14 @@ package com.comet.opik.infrastructure.log.tables; import ch.qos.logback.classic.spi.ILoggingEvent; +import com.comet.opik.domain.AutomationRuleEvaluatorLogsDAO; import com.comet.opik.domain.UserLog; import io.r2dbc.spi.ConnectionFactory; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import reactor.core.publisher.Mono; import java.util.List; +import java.util.Map; public interface UserLogTableFactory { @@ -23,15 +24,17 @@ interface UserLogTableDAO { } -@RequiredArgsConstructor class UserLogTableFactoryImpl implements UserLogTableFactory { - private final ConnectionFactory factory; + private final Map daoMap; + + UserLogTableFactoryImpl(@NonNull ConnectionFactory factory) { + daoMap = Map.of( + UserLog.AUTOMATION_RULE_EVALUATOR, AutomationRuleEvaluatorLogsDAO.create(factory)); + } @Override public UserLogTableDAO getDAO(@NonNull UserLog userLog) { - return switch (userLog) { - case AUTOMATION_RULE_EVALUATOR -> new AutomationRuleEvaluatorLogDAO(factory); - }; + return daoMap.get(userLog); } }