Skip to content

Commit

Permalink
backend: WIP on v1 implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Feb 14, 2025
1 parent 56efff0 commit bdd01c0
Show file tree
Hide file tree
Showing 10 changed files with 1,006 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ import (

v1alpha1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/console/v1alpha1"
"github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/console/v1alpha1/consolev1alpha1connect"
"github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha2/dataplanev1alpha2connect"
"github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1/dataplanev1connect"
)

var _ consolev1alpha1connect.TransformServiceHandler = (*ConsoleService)(nil)

// ConsoleService is the implementation of the transform service.
// This is mainly a wrapper of dataplane API to be used for consumption by the Console frontend.
type ConsoleService struct {
Impl dataplanev1alpha2connect.TransformServiceHandler
Impl dataplanev1connect.TransformServiceHandler
}

// ListTransforms lists the transforms.
Expand Down
21 changes: 21 additions & 0 deletions backend/pkg/api/connect/service/transform/v1/defaulter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package transform

import v1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1"

// Defaulter updates a given transforms request with defaults.
type defaulter struct{}

func (*defaulter) applyListTransformsRequest(req *v1.ListTransformsRequest) {
if req.GetPageSize() == 0 {
req.PageSize = 100
}
}
209 changes: 209 additions & 0 deletions backend/pkg/api/connect/service/transform/v1/handle_transforms.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package transform

import (
"errors"
"fmt"
"io"
"net/http"

commonv1alpha1 "buf.build/gen/go/redpandadata/common/protocolbuffers/go/redpanda/api/common/v1alpha1"
"connectrpc.com/connect"
"github.com/bufbuild/protovalidate-go"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

apierrors "github.com/redpanda-data/console/backend/pkg/api/connect/errors"
v1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1"
)

const (
// Define how many bytes are in a kilobyte (KiB) and a megabyte (MiB)
kib int64 = 1024
mib int64 = 1024 * kib
)

// HandleDeployTransform is the HTTP handler for deploying WASM transforms in Redpanda.
// Because we use multipart/form-data for uploading the binary file (up to 50mb), we did
// not use gRPC/protobuf for this.
func (s *Service) HandleDeployTransform() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !s.cfg.Redpanda.AdminAPI.Enabled {
s.writeError(w, r, apierrors.NewRedpandaAdminAPINotConfiguredError())
return
}

// 1. Parse input data that is sent using Multipart form encoding.
if r.ContentLength == 0 {
s.writeError(w, r, apierrors.NewConnectError(
connect.CodeInvalidArgument,
fmt.Errorf("request body must be a valid multipart/form-data payload, but sent body is empty"),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
))
return
}

// The max default binary upload size is 10MiB. Because this does not include
// the metadata we added 5KiB as a limit.
if err := r.ParseMultipartForm(10*mib + 5*kib); err != nil {
s.writeError(w, r, apierrors.NewConnectError(
connect.CodeInvalidArgument,
fmt.Errorf("could not parse multipart form: %w", err),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
))
return
}

metadataJSON := r.FormValue("metadata")
if metadataJSON == "" {
s.writeError(w, r, apierrors.NewConnectError(
connect.CodeInvalidArgument,
fmt.Errorf("could not find or parse form field metadata"),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
))
return
}

wasmForm, _, err := r.FormFile("wasm_binary")
if err != nil {
s.writeError(w, r, apierrors.NewConnectError(
connect.CodeInvalidArgument,
fmt.Errorf("could not find or parse form field wasm_binary: %w", err),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
))
return
}
defer wasmForm.Close()

wasmBinary, err := io.ReadAll(wasmForm)
if err != nil {
s.writeError(w, r, apierrors.NewConnectError(
connect.CodeInvalidArgument,
fmt.Errorf("could not read wasm binary: %w", err),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
))
return
}

// 2. Parse and validate request parameters
var deployTransformReq v1.DeployTransformRequest
err = protojson.UnmarshalOptions{}.Unmarshal([]byte(metadataJSON), &deployTransformReq)
if err != nil {
s.writeError(w, r, apierrors.NewConnectError(
connect.CodeInvalidArgument,
fmt.Errorf("unable to parse form field metadata: %w", err),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
))
return
}
if err := s.validateProtoMessage(&deployTransformReq); err != nil {
s.writeError(w, r, err)
return
}

// 3. Deploy WASM transform by calling the Redpanda Admin API
if err := s.redpandaSvc.DeployWasmTransform(r.Context(), s.mapper.deployTransformReqToAdminAPI(&deployTransformReq), wasmBinary); err != nil {
connectErr := apierrors.NewConnectErrorFromRedpandaAdminAPIError(err, "could not deploy wasm transform: ")
s.writeError(w, r, connectErr)
return
}

// 4. List transforms and find the just deployed transform from the response
transforms, err := s.redpandaSvc.ListWasmTransforms(r.Context())
if err != nil {
connectErr := apierrors.NewConnectErrorFromRedpandaAdminAPIError(
err,
"deployed wasm transform, but could not list wasm transforms from Redpanda cluster: ",
)
s.writeError(w, r, connectErr)
return
}

transformsProto, err := s.mapper.transformMetadataToProto(transforms)
if err != nil {
s.writeError(w, r, apierrors.NewConnectError(
connect.CodeInternal,
fmt.Errorf("deployed wasm transform, but failed to map list response to proto: %w", err),
apierrors.NewErrorInfo(v1.Reason_REASON_TYPE_MAPPING_ERROR.String()),
))
return
}

transformProto, err := findExactTransformByName(transformsProto, deployTransformReq.Name)
if err != nil {
s.writeError(w, r, apierrors.NewConnectError(
connect.CodeInternal,
fmt.Errorf("deployed wasm transform, but failed to list it afterwards"),
apierrors.NewErrorInfo(v1.Reason_REASON_TYPE_MAPPING_ERROR.String()),
))
return
}

// 5. Write found transform proto as JSON
jsonBytes, err := protojson.MarshalOptions{UseProtoNames: true}.Marshal(transformProto)
if err != nil {
s.writeError(w, r, apierrors.NewConnectError(
connect.CodeInternal,
fmt.Errorf("deployed wasm transform, but failed to serialize response into JSON: %w", err),
apierrors.NewErrorInfo(v1.Reason_REASON_TYPE_MAPPING_ERROR.String()),
))
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
if _, err := w.Write(jsonBytes); err != nil {
s.logger.Error("failed to write response to deploy wasm transform request", zap.Error(err))
}
}
}

// validateProtoMessage validates a given proto message using its
// validate rules which are defined as part of the proto message.
// This is usually done inside an interceptor, however HandleDeployTransform
// is special as it's not using the connect gateway.
func (s *Service) validateProtoMessage(msg proto.Message) error {
err := s.validator.Validate(msg)
if err == nil {
return nil
}

var badRequest *errdetails.BadRequest
var validationErr *protovalidate.ValidationError
var runtimeErr *protovalidate.RuntimeError
var compilationErr *protovalidate.CompilationError

switch {
case errors.As(err, &validationErr):
var fieldViolations []*errdetails.BadRequest_FieldViolation
for _, violation := range validationErr.Violations {
fieldViolationErr := &errdetails.BadRequest_FieldViolation{
Field: protovalidate.FieldPathString(violation.Proto.GetField()),
Description: violation.Proto.GetMessage(),
}
fieldViolations = append(fieldViolations, fieldViolationErr)
}
badRequest = apierrors.NewBadRequest(fieldViolations...)
case errors.As(err, &runtimeErr):
s.logger.Error("validation runtime error", zap.Error(runtimeErr))
case errors.As(err, &compilationErr):
s.logger.Error("validation compilation error", zap.Error(compilationErr))
}

return apierrors.NewConnectError(
connect.CodeInvalidArgument,
errors.New("provided parameters are invalid"),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
badRequest, // This may be nil, but that's okay.
)
}
98 changes: 98 additions & 0 deletions backend/pkg/api/connect/service/transform/v1/mapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package transform

import (
"fmt"

adminapi "github.com/redpanda-data/common-go/rpadmin"

v1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1"
)

type mapper struct{}

func (*mapper) partitionTransformStatusToProto(s string) (v1.PartitionTransformStatus_PartitionStatus, error) {
switch s {
case "running":
return v1.PartitionTransformStatus_PARTITION_STATUS_RUNNING, nil
case "inactive":
return v1.PartitionTransformStatus_PARTITION_STATUS_INACTIVE, nil
case "errored":
return v1.PartitionTransformStatus_PARTITION_STATUS_ERRORED, nil
case "unknown":
return v1.PartitionTransformStatus_PARTITION_STATUS_UNKNOWN, nil
default:
return v1.PartitionTransformStatus_PARTITION_STATUS_UNSPECIFIED, fmt.Errorf("unable to convert %q to a known string that can be handled by the Redpanda Admin API", s)
}
}

func (m *mapper) transformMetadataToProto(transforms []adminapi.TransformMetadata) ([]*v1.TransformMetadata, error) {
apiTransforms := make([]*v1.TransformMetadata, 0, len(transforms))
for _, transform := range transforms {
statuses := make([]*v1.PartitionTransformStatus, len(transform.Status))
for i, transformStatusWithMetadata := range transform.Status {
p, err := m.transformStatusWithMetadataToProto(transformStatusWithMetadata)
if err != nil {
return nil, fmt.Errorf("unable to convert transform status: %w", err)
}
statuses[i] = p
}

envVars := make([]*v1.TransformMetadata_EnvironmentVariable, len(transform.Environment))
for i, keyVal := range transform.Environment {
envVars[i] = &v1.TransformMetadata_EnvironmentVariable{
Key: keyVal.Key,
Value: keyVal.Value,
}
}

apiTransforms = append(apiTransforms, &v1.TransformMetadata{
Name: transform.Name,
InputTopicName: transform.InputTopic,
OutputTopicNames: transform.OutputTopics,
Statuses: statuses,
EnvironmentVariables: envVars,
})
}
return apiTransforms, nil
}

func (m *mapper) transformStatusWithMetadataToProto(transformStatusWithMetadata adminapi.PartitionTransformStatus) (*v1.PartitionTransformStatus, error) {
status, err := m.partitionTransformStatusToProto(transformStatusWithMetadata.Status)
if err != nil {
return nil, err
}

return &v1.PartitionTransformStatus{
BrokerId: int32(transformStatusWithMetadata.NodeID),
Status: status,
Lag: int32(transformStatusWithMetadata.Lag),
PartitionId: int32(transformStatusWithMetadata.Partition),
}, nil
}

func (*mapper) deployTransformReqToAdminAPI(req *v1.DeployTransformRequest) adminapi.TransformMetadata {
envVars := make([]adminapi.EnvironmentVariable, len(req.EnvironmentVariables))
for i, keyVal := range req.EnvironmentVariables {
envVars[i] = adminapi.EnvironmentVariable{
Key: keyVal.Key,
Value: keyVal.Value,
}
}

return adminapi.TransformMetadata{
Name: req.Name,
InputTopic: req.InputTopicName,
OutputTopics: req.OutputTopicNames,
Status: nil, // Output only
Environment: envVars,
}
}
Loading

0 comments on commit bdd01c0

Please sign in to comment.