-
-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add parallel bulk processing #1568
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Clément Salaün <[email protected]>
WalkthroughThe modifications enhance the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Controller
participant Backend
participant Ledger
Client ->> Controller: Send bulk request
Controller ->> Backend: Extract ledger
Controller ->> Ledger: Call ProcessBulk with `parallel` parameter
alt Sequential Processing
Ledger ->> Ledger: Process each element in order
else Parallel Processing
Ledger ->> Ledger: Process elements concurrently
end
Ledger ->> Controller: Return results
Controller ->> Client: Send response
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Outside diff range and nitpick comments (1)
components/ledger/internal/api/v2/bulk.go (1)
172-231
: Excellent implementation of parallel processing. Consider adding a timeout mechanism to prevent potential deadlocks or long-running goroutines.Implementing a timeout can be done using a
context.WithTimeout
to ensure that all goroutines complete within a reasonable timeframe.ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() ... if ctx.Err() == context.DeadlineExceeded { return nil, true, fmt.Errorf("processing timed out") }
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- components/ledger/internal/api/v2/bulk.go (2 hunks)
- components/ledger/internal/api/v2/controllers_bulk.go (1 hunks)
Additional comments not posted (1)
components/ledger/internal/api/v2/controllers_bulk.go (1)
20-26
: LGTM! Ensure proper error handling and testing for the new parallel processing feature.Verification successful
The
bulkHandler
function implementation includes error handling for theProcessBulk
call, which now includes theparallel
parameter. The function checks for errors and handles them appropriately by setting the HTTP status toBadRequest
if an error occurs.
bulkHandler
function:
- Decodes the request body into
Bulk
struct.- Calls
ProcessBulk
with theparallel
parameter.- Checks for errors (
err != nil || errorsInBulk
).- Sets
BadRequest
status if errors are present.- Encodes the response or panics if encoding fails.
The error handling appears to be in place for the parallel processing feature.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify error handling for parallel processing in `bulkHandler`. # Test: Search for error handling patterns in `bulkHandler`. rg --type go $'bulkHandler' 'components/ledger/internal/api/v2/controllers_bulk.go'Length of output: 139
Script:
#!/bin/bash # Description: Extract the full implementation of `bulkHandler` to verify error handling for parallel processing. # Search for the complete implementation of `bulkHandler` function. ast-grep --lang go --pattern $'func bulkHandler($_, $_) { $$$ }'Length of output: 2302
} else { | ||
return nil, "", tx | ||
} | ||
case ActionAddMetadata: | ||
type addMetadataRequest struct { | ||
TargetType string `json:"targetType"` | ||
TargetID json.RawMessage `json:"targetId"` | ||
Metadata metadata.Metadata `json:"metadata"` | ||
} | ||
req := &addMetadataRequest{} | ||
if err := json.Unmarshal(element.Data, req); err != nil { | ||
return fmt.Errorf("error parsing element %d: %s", i, err), "", nil | ||
} | ||
|
||
var targetID any | ||
switch req.TargetType { | ||
case ledger.MetaTargetTypeAccount: | ||
targetID = "" | ||
case ledger.MetaTargetTypeTransaction: | ||
targetID = big.NewInt(0) | ||
} | ||
if err := json.Unmarshal(req.TargetID, &targetID); err != nil { | ||
return err, "", nil | ||
} | ||
|
||
if err := l.SaveMeta(ctx, parameters, req.TargetType, targetID, req.Metadata); err != nil { | ||
var code string | ||
switch { | ||
case command.IsSaveMetaError(err, command.ErrSaveMetaCodeTransactionNotFound): | ||
code = sharedapi.ErrorCodeNotFound | ||
default: | ||
code = sharedapi.ErrorInternal | ||
} | ||
return err, code, nil | ||
} else { | ||
return nil, "", nil | ||
} | ||
case ActionRevertTransaction: | ||
type revertTransactionRequest struct { | ||
ID *big.Int `json:"id"` | ||
Force bool `json:"force"` | ||
AtEffectiveDate bool `json:"atEffectiveDate"` | ||
} | ||
req := &revertTransactionRequest{} | ||
if err := json.Unmarshal(element.Data, req); err != nil { | ||
return fmt.Errorf("error parsing element %d: %s", i, err), "", nil | ||
} | ||
|
||
tx, err := l.RevertTransaction(ctx, parameters, req.ID, req.Force, req.AtEffectiveDate) | ||
if err != nil { | ||
var code string | ||
switch { | ||
case engine.IsCommandError(err): | ||
code = ErrValidation | ||
default: | ||
code = sharedapi.ErrorInternal | ||
} | ||
return err, code, nil | ||
} else { | ||
return nil, "", tx | ||
} | ||
case ActionDeleteMetadata: | ||
type deleteMetadataRequest struct { | ||
TargetType string `json:"targetType"` | ||
TargetID json.RawMessage `json:"targetId"` | ||
Key string `json:"key"` | ||
} | ||
req := &deleteMetadataRequest{} | ||
if err := json.Unmarshal(element.Data, req); err != nil { | ||
return err, "", nil | ||
} | ||
|
||
var targetID any | ||
switch req.TargetType { | ||
case ledger.MetaTargetTypeAccount: | ||
targetID = "" | ||
case ledger.MetaTargetTypeTransaction: | ||
targetID = big.NewInt(0) | ||
} | ||
if err := json.Unmarshal(req.TargetID, &targetID); err != nil { | ||
return err, "", nil | ||
} | ||
|
||
err := l.DeleteMetadata(ctx, parameters, req.TargetType, targetID, req.Key) | ||
if err != nil { | ||
var code string | ||
switch { | ||
case command.IsDeleteMetaError(err, command.ErrSaveMetaCodeTransactionNotFound): | ||
code = sharedapi.ErrorCodeNotFound | ||
default: | ||
code = sharedapi.ErrorInternal | ||
} | ||
return err, code, nil | ||
} else { | ||
return nil, "", nil | ||
} | ||
} | ||
|
||
return fmt.Errorf("unknown action %s", element.Action), "", nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well-structured function for processing individual bulk elements. Consider adding detailed logging for each action type to improve traceability.
+ import "log"
switch element.Action {
case ActionCreateTransaction:
+ log.Printf("Processing transaction: %s", element.IdempotencyKey)
...
case ActionAddMetadata:
+ log.Printf("Adding metadata: %s", element.IdempotencyKey)
...
case ActionRevertTransaction:
+ log.Printf("Reverting transaction: %s", element.IdempotencyKey)
...
case ActionDeleteMetadata:
+ log.Printf("Deleting metadata: %s", element.IdempotencyKey)
...
}
Committable suggestion was skipped due to low confidence.
errorsInBulk = true | ||
} | ||
|
||
var bulkSuccess = func(index int, action string, data any) { | ||
ret[index] = Result{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you allow parallelism, you need to use a lock on the ret
variable
No description provided.