Skip to content

Commit

Permalink
[OPIK-844] Add support for span comments (#1137)
Browse files Browse the repository at this point in the history
  • Loading branch information
BorisTkachenko authored Jan 27, 2025
1 parent 1a8ae96 commit 0fd113e
Show file tree
Hide file tree
Showing 24 changed files with 693 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
public record Comment(
@Schema(accessMode = Schema.AccessMode.READ_ONLY) UUID id,
@NotBlank String text,
Instant createdAt,
Instant lastUpdatedAt,
String createdBy,
String lastUpdatedBy) {
@Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant createdAt,
@Schema(accessMode = Schema.AccessMode.READ_ONLY) Instant lastUpdatedAt,
@Schema(accessMode = Schema.AccessMode.READ_ONLY) String createdBy,
@Schema(accessMode = Schema.AccessMode.READ_ONLY) String lastUpdatedBy) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public record Span(
@JsonView({Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) String lastUpdatedBy,
@JsonView({
Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) List<FeedbackScore> feedbackScores,
@JsonView({Span.View.Public.class}) @Schema(accessMode = Schema.AccessMode.READ_ONLY) List<Comment> comments,
@JsonView({Span.View.Public.class, Span.View.Write.class}) @DecimalMin("0.0") BigDecimal totalEstimatedCost,
String totalEstimatedCostVersion,
@JsonView({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.comet.opik.api.resources.v1.priv;

import com.codahale.metrics.annotation.Timed;
import com.comet.opik.api.BatchDelete;
import com.comet.opik.api.Comment;
import com.comet.opik.api.DeleteFeedbackScore;
import com.comet.opik.api.FeedbackDefinition;
import com.comet.opik.api.FeedbackScore;
Expand All @@ -13,13 +15,16 @@
import com.comet.opik.api.SpanUpdate;
import com.comet.opik.api.filter.FiltersFactory;
import com.comet.opik.api.filter.SpanFilter;
import com.comet.opik.domain.CommentDAO;
import com.comet.opik.domain.CommentService;
import com.comet.opik.domain.FeedbackScoreService;
import com.comet.opik.domain.SpanService;
import com.comet.opik.domain.SpanType;
import com.comet.opik.infrastructure.auth.RequestContext;
import com.comet.opik.infrastructure.ratelimit.RateLimited;
import com.comet.opik.utils.AsyncUtils;
import com.fasterxml.jackson.annotation.JsonView;
import io.dropwizard.jersey.errors.ErrorMessage;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.headers.Header;
import io.swagger.v3.oas.annotations.media.ArraySchema;
Expand Down Expand Up @@ -72,6 +77,7 @@ public class SpansResource {

private final @NonNull SpanService spanService;
private final @NonNull FeedbackScoreService feedbackScoreService;
private final @NonNull CommentService commentService;
private final @NonNull FiltersFactory filtersFactory;
private final @NonNull Provider<RequestContext> requestContext;

Expand Down Expand Up @@ -332,4 +338,90 @@ public Response findFeedbackScoreNames(@QueryParam("project_id") UUID projectId,
return Response.ok(feedbackScoreNames).build();
}

@POST
@Path("/{id}/comments")
@Operation(operationId = "addSpanComment", summary = "Add span comment", description = "Add span comment", responses = {
@ApiResponse(responseCode = "201", description = "Created", headers = {
@Header(name = "Location", required = true, example = "${basePath}/v1/private/spans/{spanId}/comments/{commentId}", schema = @Schema(implementation = String.class))})})
public Response addSpanComment(@PathParam("id") UUID id,
@RequestBody(content = @Content(schema = @Schema(implementation = Comment.class))) @NotNull @Valid Comment comment,
@Context UriInfo uriInfo) {

String workspaceId = requestContext.get().getWorkspaceId();

log.info("Add comment for span with id '{}' on workspaceId '{}'", id, workspaceId);

var commentId = commentService.create(id, comment, CommentDAO.EntityType.SPAN)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

var uri = uriInfo.getAbsolutePathBuilder().path("/%s".formatted(commentId)).build();
log.info("Added comment with id '{}' for span with id '{}' on workspaceId '{}'", comment.id(), id,
workspaceId);

return Response.created(uri).build();
}

@GET
@Path("/{spanId}/comments/{commentId}")
@Operation(operationId = "getSpanComment", summary = "Get span comment", description = "Get span comment", responses = {
@ApiResponse(responseCode = "200", description = "Comment resource", content = @Content(schema = @Schema(implementation = Comment.class))),
@ApiResponse(responseCode = "404", description = "Not found", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
public Response getSpanComment(@PathParam("commentId") @NotNull UUID commentId,
@PathParam("spanId") @NotNull UUID spanId) {

String workspaceId = requestContext.get().getWorkspaceId();

log.info("Getting span comment by id '{}' on workspace_id '{}'", commentId, workspaceId);

Comment comment = commentService.get(spanId, commentId)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

log.info("Got span comment by id '{}', on workspace_id '{}'", comment.id(), workspaceId);

return Response.ok(comment).build();
}

@PATCH
@Path("/comments/{commentId}")
@Operation(operationId = "updateSpanComment", summary = "Update span comment by id", description = "Update span comment by id", responses = {
@ApiResponse(responseCode = "204", description = "No Content"),
@ApiResponse(responseCode = "404", description = "Not found")})
public Response updateSpanComment(@PathParam("commentId") UUID commentId,
@RequestBody(content = @Content(schema = @Schema(implementation = Comment.class))) @NotNull @Valid Comment comment) {

String workspaceId = requestContext.get().getWorkspaceId();

log.info("Update span comment with id '{}' on workspaceId '{}'", commentId, workspaceId);

commentService.update(commentId, comment)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

log.info("Updated span comment with id '{}' on workspaceId '{}'", commentId, workspaceId);

return Response.noContent().build();
}

@POST
@Path("/comments/delete")
@Operation(operationId = "deleteSpanComments", summary = "Delete span comments", description = "Delete span comments", responses = {
@ApiResponse(responseCode = "204", description = "No Content"),
})
public Response deleteSpanComments(
@NotNull @RequestBody(content = @Content(schema = @Schema(implementation = BatchDelete.class))) @Valid BatchDelete batchDelete) {

String workspaceId = requestContext.get().getWorkspaceId();

log.info("Delete span comments with ids '{}' on workspaceId '{}'", batchDelete.ids(), workspaceId);

commentService.delete(batchDelete)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

log.info("Deleted span comments with ids '{}' on workspaceId '{}'", batchDelete.ids(), workspaceId);

return Response.noContent().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.comet.opik.api.TraceUpdate;
import com.comet.opik.api.filter.FiltersFactory;
import com.comet.opik.api.filter.TraceFilter;
import com.comet.opik.domain.CommentDAO;
import com.comet.opik.domain.CommentService;
import com.comet.opik.domain.FeedbackScoreService;
import com.comet.opik.domain.TraceService;
Expand Down Expand Up @@ -270,7 +271,7 @@ public Response addTraceComment(@PathParam("id") UUID id,

log.info("Add comment for trace with id '{}' on workspaceId '{}'", id, workspaceId);

var commentId = commentService.create(id, comment)
var commentId = commentService.create(id, comment, CommentDAO.EntityType.TRACE)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

import com.comet.opik.api.Comment;
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
import com.google.common.base.Preconditions;
import com.google.inject.ImplementedBy;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.stringtemplate.v4.ST;
import reactor.core.publisher.Mono;

Expand All @@ -30,23 +29,24 @@ public interface CommentDAO {
@Getter
@RequiredArgsConstructor
enum EntityType {
TRACE("trace", "traces");
TRACE("trace", "traces"),
SPAN("span", "spans");

private final String type;
private final String tableName;
}

Mono<Long> addComment(UUID commentId, UUID entityId, UUID projectId, Comment comment);
Mono<Long> addComment(UUID commentId, UUID entityId, EntityType entityType, UUID projectId, Comment comment);

Mono<Comment> findById(UUID traceId, UUID commentId);
Mono<Comment> findById(UUID entityId, UUID commentId);

Mono<Void> updateComment(UUID commentId, Comment comment);

Mono<Void> deleteByIds(Set<UUID> commentIds);
Mono<Long> deleteByIds(Set<UUID> commentIds);

Mono<Void> deleteByEntityId(EntityType entityType, UUID entityId);
Mono<Long> deleteByEntityId(EntityType entityType, UUID entityId);

Mono<Void> deleteByEntityIds(EntityType entityType, Set<UUID> entityIds);
Mono<Long> deleteByEntityIds(EntityType entityType, Set<UUID> entityIds);
}

@Singleton
Expand All @@ -58,6 +58,7 @@ class CommentDAOImpl implements CommentDAO {
INSERT INTO comments(
id,
entity_id,
entity_type,
project_id,
workspace_id,
text,
Expand All @@ -68,6 +69,7 @@ INSERT INTO comments(
(
:id,
:entity_id,
:entity_type,
:project_id,
:workspace_id,
:text,
Expand Down Expand Up @@ -128,33 +130,34 @@ INSERT INTO comments (
private final @NonNull TransactionTemplateAsync asyncTemplate;

@Override
public Mono<Long> addComment(@NonNull UUID commentId, @NonNull UUID entityId, @NonNull UUID projectId,
public Mono<Long> addComment(@NonNull UUID commentId, @NonNull UUID entityId, @NonNull EntityType entityType,
@NonNull UUID projectId,
@NonNull Comment comment) {
return asyncTemplate.nonTransaction(connection -> {

var statement = connection.createStatement(INSERT_COMMENT);

bindParameters(commentId, entityId, projectId, comment, statement);
bindParameters(commentId, entityId, entityType, projectId, comment, statement);

return makeMonoContextAware(bindUserNameAndWorkspaceContext(statement))
.flatMap(result -> Mono.from(result.getRowsUpdated()));
});
}

@Override
public Mono<Comment> findById(UUID traceId, @NonNull UUID commentId) {
public Mono<Comment> findById(UUID entityId, @NonNull UUID commentId) {
return asyncTemplate.nonTransaction(connection -> {

var template = new ST(SELECT_COMMENT_BY_ID);
if (traceId != null) {
template.add("entity_id", traceId);
if (entityId != null) {
template.add("entity_id", entityId);
}

var statement = connection.createStatement(template.render())
.bind("id", commentId);

if (traceId != null) {
statement.bind("entity_id", traceId);
if (entityId != null) {
statement.bind("entity_id", entityId);
}

return makeFluxContextAware(bindWorkspaceIdToFlux(statement))
Expand All @@ -177,27 +180,29 @@ public Mono<Void> updateComment(@NonNull UUID commentId, @NonNull Comment commen
}

@Override
public Mono<Void> deleteByIds(@NonNull Set<UUID> commentIds) {
public Mono<Long> deleteByIds(@NonNull Set<UUID> commentIds) {
return asyncTemplate.nonTransaction(connection -> {

var statement = connection.createStatement(DELETE_COMMENT_BY_ID)
.bind("ids", commentIds);

return makeMonoContextAware(bindWorkspaceIdToMono(statement))
.then();
.flatMapMany(Result::getRowsUpdated)
.reduce(0L, Long::sum);
});
}

@Override
public Mono<Void> deleteByEntityId(@NonNull EntityType entityType, @NonNull UUID entityId) {
public Mono<Long> deleteByEntityId(@NonNull EntityType entityType, @NonNull UUID entityId) {
return deleteByEntityIds(entityType, Set.of(entityId));
}

@Override
public Mono<Void> deleteByEntityIds(@NonNull EntityType entityType, Set<UUID> entityIds) {
Preconditions.checkArgument(
CollectionUtils.isNotEmpty(entityIds), "Argument 'entityIds' must not be empty");
public Mono<Long> deleteByEntityIds(@NonNull EntityType entityType, @NonNull Set<UUID> entityIds) {
log.info("Deleting comments for entityType '{}', entityIds count '{}'", entityType, entityIds.size());
if (entityIds.isEmpty()) {
return Mono.just(0L);
}

return asyncTemplate.nonTransaction(connection -> {

Expand All @@ -206,13 +211,16 @@ public Mono<Void> deleteByEntityIds(@NonNull EntityType entityType, Set<UUID> en
.bind("entity_type", entityType.getType());

return makeMonoContextAware(bindWorkspaceIdToMono(statement))
.then();
.flatMapMany(Result::getRowsUpdated)
.reduce(0L, Long::sum);
});
}

private void bindParameters(UUID commentId, UUID entityId, UUID projectId, Comment comment, Statement statement) {
private void bindParameters(UUID commentId, UUID entityId, EntityType entityType, UUID projectId, Comment comment,
Statement statement) {
statement.bind("id", commentId)
.bind("entity_id", entityId)
.bind("entity_type", entityType.getType())
.bind("project_id", projectId)
.bind("text", comment.text());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import static com.comet.opik.utils.ValidationUtils.CLICKHOUSE_FIXED_STRING_UUID_FIELD_NULL_VALUE;

@UtilityClass
public class CommentResultMapper {
class CommentResultMapper {

static Publisher<Comment> mapItem(Result results) {
return results.map((row, rowMetadata) -> Comment.builder()
Expand Down
Loading

0 comments on commit 0fd113e

Please sign in to comment.