Skip to content

Commit

Permalink
OPIK-796: Add new automation rule evaluator logs endpoint (#1115)
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora authored Jan 22, 2025
1 parent 840b9d5 commit 890b0d9
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.comet.opik.api;

import lombok.Builder;
import lombok.NonNull;

import java.util.Map;
import java.util.UUID;

import static com.comet.opik.api.LogItem.LogLevel;

@Builder
public record LogCriteria(
@NonNull String workspaceId,
UUID entityId,
LogLevel level,
Integer size,
Map<String, String> markers) {
}
43 changes: 43 additions & 0 deletions apps/opik-backend/src/main/java/com/comet/opik/api/LogItem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;

@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record LogItem(
@Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant timestamp,
@JsonIgnore String workspaceId,
@Schema(accessMode = Schema.AccessMode.READ_ONLY) UUID ruleId,
@Schema(accessMode = Schema.AccessMode.READ_ONLY) LogLevel level,
@Schema(accessMode = Schema.AccessMode.READ_ONLY) String message,
@Schema(accessMode = Schema.AccessMode.READ_ONLY) Map<String, String> markers) {

public enum LogLevel {
INFO,
WARN,
ERROR,
DEBUG,
TRACE
}

@Builder
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record LogPage(List<LogItem> content, int page, int size, long total) implements Page<LogItem> {

public static LogPage empty(int page) {
return new LogPage(List.of(), page, 0, 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.comet.opik.api.AutomationRuleEvaluator;
import com.comet.opik.api.AutomationRuleEvaluatorUpdate;
import com.comet.opik.api.BatchDelete;
import com.comet.opik.api.LogCriteria;
import com.comet.opik.api.Page;
import com.comet.opik.domain.AutomationRuleEvaluatorService;
import com.comet.opik.infrastructure.auth.RequestContext;
Expand Down Expand Up @@ -43,6 +44,7 @@

import static com.comet.opik.api.AutomationRuleEvaluator.AutomationRuleEvaluatorPage;
import static com.comet.opik.api.AutomationRuleEvaluator.View;
import static com.comet.opik.api.LogItem.LogPage;

@Path("/v1/private/automations/projects/{projectId}/evaluators/")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -164,4 +166,23 @@ public Response deleteEvaluators(
return Response.noContent().build();
}

@GET
@Path("/{id}/logs")
@Operation(operationId = "getEvaluatorLogsById", summary = "Get automation rule evaluator logs by id", description = "Get automation rule evaluator logs by id", responses = {
@ApiResponse(responseCode = "200", description = "Automation rule evaluator logs resource", content = @Content(schema = @Schema(implementation = LogPage.class)))
})
public Response getLogs(@PathParam("projectId") UUID projectId, @PathParam("id") UUID evaluatorId,
@QueryParam("size") @Min(1) @DefaultValue("1000") int size) {
String workspaceId = requestContext.get().getWorkspaceId();

log.info("Looking for logs for automated evaluator: id '{}' on project_id '{}' and workspace_id '{}'",
evaluatorId, projectId, workspaceId);
var criteria = LogCriteria.builder().workspaceId(workspaceId).entityId(evaluatorId).size(size).build();
LogPage logs = service.getLogs(criteria).block();
log.info("Found {} logs for automated evaluator: id '{}' on project_id '{}'and workspace_id '{}'", logs.size(),
evaluatorId, projectId, workspaceId);

return Response.ok().entity(logs).build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package com.comet.opik.domain;

import ch.qos.logback.classic.spi.ILoggingEvent;
import com.comet.opik.api.LogCriteria;
import com.comet.opik.api.LogItem;
import com.comet.opik.utils.TemplateUtils;
import com.google.inject.ImplementedBy;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.Statement;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Mono;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static com.comet.opik.api.LogItem.LogLevel;
import static com.comet.opik.api.LogItem.LogPage;
import static com.comet.opik.infrastructure.log.tables.UserLogTableFactory.UserLogTableDAO;
import static com.comet.opik.utils.TemplateUtils.getQueryItemPlaceHolder;

@ImplementedBy(AutomationRuleEvaluatorLogsDAOImpl.class)
public interface AutomationRuleEvaluatorLogsDAO extends UserLogTableDAO {

static AutomationRuleEvaluatorLogsDAO create(ConnectionFactory factory) {
return new AutomationRuleEvaluatorLogsDAOImpl(factory);
}

Mono<LogPage> findLogs(LogCriteria criteria);

}

@Slf4j
@Singleton
@RequiredArgsConstructor(onConstructor_ = @Inject)
class AutomationRuleEvaluatorLogsDAOImpl implements AutomationRuleEvaluatorLogsDAO {

private static final String INSERT_STATEMENT = """
INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers)
VALUES <items:{item |
(
parseDateTime64BestEffort(:timestamp<item.index>, 9),
:level<item.index>,
:workspace_id<item.index>,
:rule_id<item.index>,
:message<item.index>,
mapFromArrays(:marker_keys<item.index>, :marker_values<item.index>)
)
<if(item.hasNext)>,<endif>
}>
;
""";

public static final String FIND_ALL = """
SELECT * FROM automation_rule_evaluator_logs
WHERE workspace_id = :workspace_id
<if(level)> AND level = :level <endif>
<if(ruleId)> AND rule_id = :rule_id <endif>
ORDER BY timestamp DESC
<if(limit)> LIMIT :limit <endif><if(offset)> OFFSET :offset <endif>
""";

private final @NonNull ConnectionFactory connectionFactory;

public Mono<LogPage> findLogs(@NonNull LogCriteria criteria) {
return Mono.from(connectionFactory.create())
.flatMapMany(connection -> {

log.info("Finding logs with criteria: {}", criteria);

var template = new ST(FIND_ALL);

bindTemplateParameters(criteria, template);

Statement statement = connection.createStatement(template.render());

bindParameters(criteria, statement);

return statement.execute();

})
.flatMap(result -> result.map((row, rowMetadata) -> mapRow(row)))
.collectList()
.map(this::mapPage);
}

private LogPage mapPage(List<LogItem> logs) {
return LogPage.builder()
.content(logs)
.page(1)
.total(logs.size())
.size(logs.size())
.build();
}

private LogItem mapRow(Row row) {
return LogItem.builder()
.timestamp(row.get("timestamp", Instant.class))
.level(LogLevel.valueOf(row.get("level", String.class)))
.workspaceId(row.get("workspace_id", String.class))
.ruleId(row.get("rule_id", UUID.class))
.message(row.get("message", String.class))
.markers(row.get("markers", Map.class))
.build();
}

private void bindTemplateParameters(LogCriteria criteria, ST template) {
Optional.ofNullable(criteria.level()).ifPresent(level -> template.add("level", level));
Optional.ofNullable(criteria.entityId()).ifPresent(ruleId -> template.add("ruleId", ruleId));
Optional.ofNullable(criteria.size()).ifPresent(limit -> template.add("limit", limit));
}

private void bindParameters(LogCriteria criteria, Statement statement) {
statement.bind("workspace_id", criteria.workspaceId());
Optional.ofNullable(criteria.level()).ifPresent(level -> statement.bind("level", level));
Optional.ofNullable(criteria.entityId()).ifPresent(ruleId -> statement.bind("rule_id", ruleId));
Optional.ofNullable(criteria.size()).ifPresent(limit -> statement.bind("limit", limit));
}

@Override
public Mono<Void> saveAll(@NonNull List<ILoggingEvent> events) {

return Mono.from(connectionFactory.create())
.flatMapMany(connection -> {
var template = new ST(INSERT_STATEMENT);

List<TemplateUtils.QueryItem> queryItems = getQueryItemPlaceHolder(events.size());

template.add("items", queryItems);

Statement statement = connection.createStatement(template.render());

for (int i = 0; i < events.size(); i++) {

ILoggingEvent event = events.get(i);
String logLevel = event.getLevel().toString();
String workspaceId = Optional.ofNullable(event.getMDCPropertyMap().get("workspace_id"))
.orElseThrow(() -> failWithMessage("workspace_id is not set"));
String traceId = Optional.ofNullable(event.getMDCPropertyMap().get("trace_id"))
.orElseThrow(() -> failWithMessage("trace_id is not set"));
String ruleId = Optional.ofNullable(event.getMDCPropertyMap().get("rule_id"))
.orElseThrow(() -> failWithMessage("rule_id is not set"));

statement
.bind("timestamp" + i, event.getInstant().toString())
.bind("level" + i, logLevel)
.bind("workspace_id" + i, workspaceId)
.bind("rule_id" + i, ruleId)
.bind("message" + i, event.getFormattedMessage())
.bind("marker_keys" + i, new String[]{"trace_id"})
.bind("marker_values" + i, new String[]{traceId});
}

return statement.execute();

})
.collectList()
.then();

}

private IllegalStateException failWithMessage(String message) {
log.error(message);
return new IllegalStateException(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.comet.opik.api.AutomationRuleEvaluatorLlmAsJudge;
import com.comet.opik.api.AutomationRuleEvaluatorType;
import com.comet.opik.api.AutomationRuleEvaluatorUpdate;
import com.comet.opik.api.LogCriteria;
import com.comet.opik.api.error.EntityAlreadyExistsException;
import com.comet.opik.api.error.ErrorMessage;
import com.comet.opik.infrastructure.cache.CacheEvict;
Expand All @@ -19,6 +20,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
import reactor.core.publisher.Mono;
import ru.vyarus.guicey.jdbi3.tx.TransactionTemplate;

import java.sql.SQLIntegrityConstraintViolationException;
Expand All @@ -28,6 +30,7 @@
import java.util.UUID;

import static com.comet.opik.api.AutomationRuleEvaluator.AutomationRuleEvaluatorPage;
import static com.comet.opik.api.LogItem.LogPage;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.READ_ONLY;
import static com.comet.opik.infrastructure.db.TransactionTemplateAsync.WRITE;

Expand All @@ -50,6 +53,8 @@ AutomationRuleEvaluatorPage find(@NonNull UUID projectId, @NonNull String worksp

List<AutomationRuleEvaluatorLlmAsJudge> findAll(@NonNull UUID projectId, @NonNull String workspaceId,
AutomationRuleEvaluatorType automationRuleEvaluatorType);

Mono<LogPage> getLogs(LogCriteria criteria);
}

@Singleton
Expand All @@ -61,6 +66,7 @@ class AutomationRuleEvaluatorServiceImpl implements AutomationRuleEvaluatorServi

private final @NonNull IdGenerator idGenerator;
private final @NonNull TransactionTemplate template;
private final @NonNull AutomationRuleEvaluatorLogsDAO logsDAO;

@Override
@CacheEvict(name = "automation_rule_evaluators_find_by_type", key = "$projectId +'-'+ $workspaceId +'-'+ $inputRuleEvaluator.type")
Expand Down Expand Up @@ -250,4 +256,9 @@ public List<AutomationRuleEvaluatorLlmAsJudge> findAll(@NonNull UUID projectId,
});
}

@Override
public Mono<LogPage> getLogs(@NonNull LogCriteria criteria) {
return logsDAO.findLogs(criteria);
}

}
Loading

0 comments on commit 890b0d9

Please sign in to comment.