Skip to content

Commit

Permalink
Sync client should call CloseSend when done sending (#5884)
Browse files Browse the repository at this point in the history
* Sync client should call CloseSend when done

Signed-off-by: Rafael Raposo <[email protected]>

* No capital on error

Signed-off-by: Rafael Raposo <[email protected]>

* No capital on error

Signed-off-by: Rafael Raposo <[email protected]>

---------

Signed-off-by: Rafael Raposo <[email protected]>
  • Loading branch information
RRap0so authored Oct 22, 2024
1 parent 56b6d6d commit 71386f8
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,15 @@ func (p *Plugin) ExecuteTaskSync(
return nil, nil, fmt.Errorf("failed to send inputsProto with error: %w", err)
}

// Client is done with sending
if err := stream.CloseSend(); err != nil {
logger.Errorf(ctx, "failed to close stream with err %s", err.Error())
return nil, nil, err
}

in, err := stream.Recv()
if err != nil {
logger.Errorf(ctx, "Failed to write output with err %s", err.Error())
logger.Errorf(ctx, "failed to write output with err %s", err.Error())
return nil, nil, err
}
if in.GetHeader() == nil {
Expand All @@ -188,11 +194,6 @@ func (p *Plugin) ExecuteTaskSync(
// For now, Propeller assumes that the output is always in the header.
resource := in.GetHeader().GetResource()

if err := stream.CloseSend(); err != nil {
logger.Errorf(ctx, "Failed to close stream with err %s", err.Error())
return nil, nil, err
}

return nil, ResourceWrapper{
Phase: resource.Phase,
Outputs: resource.Outputs,
Expand Down Expand Up @@ -272,7 +273,7 @@ func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phas
case flyteIdl.TaskExecution_SUCCEEDED:
err = writeOutput(ctx, taskCtx, resource.Outputs)
if err != nil {
logger.Errorf(ctx, "Failed to write output with err %s", err.Error())
logger.Errorf(ctx, "failed to write output with err %s", err.Error())
return core.PhaseInfoUndefined, err
}
return core.PhaseInfoSuccess(taskInfo), nil
Expand Down Expand Up @@ -300,7 +301,7 @@ func (p *Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phas
case admin.State_SUCCEEDED:
err = writeOutput(ctx, taskCtx, resource.Outputs)
if err != nil {
logger.Errorf(ctx, "Failed to write output with err %s", err.Error())
logger.Errorf(ctx, "failed to write output with err %s", err.Error())
return core.PhaseInfoUndefined, err
}
return core.PhaseInfoSuccess(taskInfo), nil
Expand Down

0 comments on commit 71386f8

Please sign in to comment.