Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add parallel bulk processing #1568

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 168 additions & 144 deletions components/ledger/internal/api/v2/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@ import (
"encoding/json"
"fmt"
"math/big"
"sync"

"github.com/formancehq/ledger/internal/opentelemetry/tracer"

sharedapi "github.com/formancehq/stack/libs/go-libs/api"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/engine"
"github.com/formancehq/ledger/internal/machine"
"github.com/formancehq/ledger/internal/opentelemetry/tracer"
sharedapi "github.com/formancehq/stack/libs/go-libs/api"
"github.com/formancehq/stack/libs/go-libs/metadata"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/api/backend"
"github.com/formancehq/ledger/internal/engine/command"
"github.com/formancehq/stack/libs/go-libs/metadata"
)

const (
Expand All @@ -42,168 +41,193 @@ type Result struct {
ResponseType string `json:"responseType"` // Added for sdk generation (discriminator in oneOf)
}

func ProcessBulk(ctx context.Context, l backend.Ledger, bulk Bulk, continueOnFailure bool) ([]Result, bool, error) {
func processBulkElement(
ctx context.Context,
l backend.Ledger,
parameters command.Parameters,
element Element,
i int,
) (error, string, any) {
switch element.Action {
case ActionCreateTransaction:
req := &ledger.TransactionRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return fmt.Errorf("error parsing element %d: %s", i, err), "", nil
}
rs := req.ToRunScript()

tx, err := l.CreateTransaction(ctx, parameters, *rs)
if err != nil {
var code string
switch {
case machine.IsInsufficientFundError(err):
code = ErrInsufficientFund
case engine.IsCommandError(err):
code = ErrValidation
default:
code = sharedapi.ErrorInternal
}
return err, code, nil
} else {
return nil, "", tx
}
case ActionAddMetadata:
type addMetadataRequest struct {
TargetType string `json:"targetType"`
TargetID json.RawMessage `json:"targetId"`
Metadata metadata.Metadata `json:"metadata"`
}
req := &addMetadataRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return fmt.Errorf("error parsing element %d: %s", i, err), "", nil
}

var targetID any
switch req.TargetType {
case ledger.MetaTargetTypeAccount:
targetID = ""
case ledger.MetaTargetTypeTransaction:
targetID = big.NewInt(0)
}
if err := json.Unmarshal(req.TargetID, &targetID); err != nil {
return err, "", nil
}

if err := l.SaveMeta(ctx, parameters, req.TargetType, targetID, req.Metadata); err != nil {
var code string
switch {
case command.IsSaveMetaError(err, command.ErrSaveMetaCodeTransactionNotFound):
code = sharedapi.ErrorCodeNotFound
default:
code = sharedapi.ErrorInternal
}
return err, code, nil
} else {
return nil, "", nil
}
case ActionRevertTransaction:
type revertTransactionRequest struct {
ID *big.Int `json:"id"`
Force bool `json:"force"`
AtEffectiveDate bool `json:"atEffectiveDate"`
}
req := &revertTransactionRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return fmt.Errorf("error parsing element %d: %s", i, err), "", nil
}

tx, err := l.RevertTransaction(ctx, parameters, req.ID, req.Force, req.AtEffectiveDate)
if err != nil {
var code string
switch {
case engine.IsCommandError(err):
code = ErrValidation
default:
code = sharedapi.ErrorInternal
}
return err, code, nil
} else {
return nil, "", tx
}
case ActionDeleteMetadata:
type deleteMetadataRequest struct {
TargetType string `json:"targetType"`
TargetID json.RawMessage `json:"targetId"`
Key string `json:"key"`
}
req := &deleteMetadataRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return err, "", nil
}

var targetID any
switch req.TargetType {
case ledger.MetaTargetTypeAccount:
targetID = ""
case ledger.MetaTargetTypeTransaction:
targetID = big.NewInt(0)
}
if err := json.Unmarshal(req.TargetID, &targetID); err != nil {
return err, "", nil
}

err := l.DeleteMetadata(ctx, parameters, req.TargetType, targetID, req.Key)
if err != nil {
var code string
switch {
case command.IsDeleteMetaError(err, command.ErrSaveMetaCodeTransactionNotFound):
code = sharedapi.ErrorCodeNotFound
default:
code = sharedapi.ErrorInternal
}
return err, code, nil
} else {
return nil, "", nil
}
}

return fmt.Errorf("unknown action %s", element.Action), "", nil
}
Comment on lines +44 to +170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well-structured function for processing individual bulk elements. Consider adding detailed logging for each action type to improve traceability.

+ import "log"

  switch element.Action {
  case ActionCreateTransaction:
+   log.Printf("Processing transaction: %s", element.IdempotencyKey)
    ...
  case ActionAddMetadata:
+   log.Printf("Adding metadata: %s", element.IdempotencyKey)
    ...
  case ActionRevertTransaction:
+   log.Printf("Reverting transaction: %s", element.IdempotencyKey)
    ...
  case ActionDeleteMetadata:
+   log.Printf("Deleting metadata: %s", element.IdempotencyKey)
    ...
  }

Committable suggestion was skipped due to low confidence.


func ProcessBulk(
ctx context.Context,
ledger backend.Ledger,
bulk Bulk,
continueOnFailure bool,
parallel bool,
) ([]Result, bool, error) {

ctx, span := tracer.Start(ctx, "Bulk")
defer span.End()

ret := make([]Result, 0, len(bulk))
ret := make([]Result, len(bulk))

errorsInBulk := false
var bulkError = func(action, code string, err error) {
ret = append(ret, Result{

var bulkError = func(index int, action, code string, err error) {
ret[index] = Result{
ErrorCode: code,
ErrorDescription: err.Error(),
ResponseType: "ERROR",
})
}
errorsInBulk = true
}

var bulkSuccess = func(index int, action string, data any) {
ret[index] = Result{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you allow parallelism, you need to use a lock on the ret variable

Data: data,
ResponseType: action,
}
}

wg := sync.WaitGroup{}

for i, element := range bulk {
parameters := command.Parameters{
DryRun: false,
IdempotencyKey: element.IdempotencyKey,
}

switch element.Action {
case ActionCreateTransaction:
req := &ledger.TransactionRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, errorsInBulk, fmt.Errorf("error parsing element %d: %s", i, err)
}
rs := req.ToRunScript()
wg.Add(1)

tx, err := l.CreateTransaction(ctx, parameters, *rs)
go func(element Element, index int) {
err, code, data := processBulkElement(ctx, ledger, parameters, element, index)
if err != nil {
var code string
switch {
case machine.IsInsufficientFundError(err):
code = ErrInsufficientFund
case engine.IsCommandError(err):
code = ErrValidation
default:
code = sharedapi.ErrorInternal
}
bulkError(element.Action, code, err)
if !continueOnFailure {
return ret, errorsInBulk, nil
}
bulkError(index, element.Action, code, err)
} else {
ret = append(ret, Result{
Data: tx,
ResponseType: element.Action,
})
}
case ActionAddMetadata:
type addMetadataRequest struct {
TargetType string `json:"targetType"`
TargetID json.RawMessage `json:"targetId"`
Metadata metadata.Metadata `json:"metadata"`
}
req := &addMetadataRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, errorsInBulk, fmt.Errorf("error parsing element %d: %s", i, err)
bulkSuccess(index, element.Action, data)
}
wg.Done()
}(element, i)

var targetID any
switch req.TargetType {
case ledger.MetaTargetTypeAccount:
targetID = ""
case ledger.MetaTargetTypeTransaction:
targetID = big.NewInt(0)
}
if err := json.Unmarshal(req.TargetID, &targetID); err != nil {
return nil, errorsInBulk, err
}

if err := l.SaveMeta(ctx, parameters, req.TargetType, targetID, req.Metadata); err != nil {
var code string
switch {
case command.IsSaveMetaError(err, command.ErrSaveMetaCodeTransactionNotFound):
code = sharedapi.ErrorCodeNotFound
default:
code = sharedapi.ErrorInternal
}
bulkError(element.Action, code, err)
if !continueOnFailure {
return ret, errorsInBulk, nil
}
} else {
ret = append(ret, Result{
ResponseType: element.Action,
})
}
case ActionRevertTransaction:
type revertTransactionRequest struct {
ID *big.Int `json:"id"`
Force bool `json:"force"`
AtEffectiveDate bool `json:"atEffectiveDate"`
}
req := &revertTransactionRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, errorsInBulk, fmt.Errorf("error parsing element %d: %s", i, err)
}

tx, err := l.RevertTransaction(ctx, parameters, req.ID, req.Force, req.AtEffectiveDate)
if err != nil {
var code string
switch {
case engine.IsCommandError(err):
code = ErrValidation
default:
code = sharedapi.ErrorInternal
}
bulkError(element.Action, code, err)
if !continueOnFailure {
return ret, errorsInBulk, nil
}
} else {
ret = append(ret, Result{
Data: tx,
ResponseType: element.Action,
})
}
case ActionDeleteMetadata:
type deleteMetadataRequest struct {
TargetType string `json:"targetType"`
TargetID json.RawMessage `json:"targetId"`
Key string `json:"key"`
}
req := &deleteMetadataRequest{}
if err := json.Unmarshal(element.Data, req); err != nil {
return nil, errorsInBulk, fmt.Errorf("error parsing element %d: %s", i, err)
}

var targetID any
switch req.TargetType {
case ledger.MetaTargetTypeAccount:
targetID = ""
case ledger.MetaTargetTypeTransaction:
targetID = big.NewInt(0)
}
if err := json.Unmarshal(req.TargetID, &targetID); err != nil {
return nil, errorsInBulk, err
}

err := l.DeleteMetadata(ctx, parameters, req.TargetType, targetID, req.Key)
if err != nil {
var code string
switch {
case command.IsDeleteMetaError(err, command.ErrSaveMetaCodeTransactionNotFound):
code = sharedapi.ErrorCodeNotFound
default:
code = sharedapi.ErrorInternal
}
bulkError(element.Action, code, err)
if !continueOnFailure {
return ret, errorsInBulk, nil
}
} else {
ret = append(ret, Result{
ResponseType: element.Action,
})
}
if !parallel {
wg.Wait()
}
}

if parallel {
wg.Wait()
}

return ret, errorsInBulk, nil
}
10 changes: 9 additions & 1 deletion components/ledger/internal/api/v2/controllers_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ func bulkHandler(w http.ResponseWriter, r *http.Request) {
}

w.Header().Set("Content-Type", "application/json")
ret, errorsInBulk, err := ProcessBulk(r.Context(), backend.LedgerFromContext(r.Context()), b, sharedapi.QueryParamBool(r, "continueOnFailure"))

ret, errorsInBulk, err := ProcessBulk(
r.Context(),
backend.LedgerFromContext(r.Context()),
b,
sharedapi.QueryParamBool(r, "continueOnFailure"),
sharedapi.QueryParamBool(r, "parallel"),
)

if err != nil || errorsInBulk {
w.WriteHeader(http.StatusBadRequest)
}
Expand Down
Loading