Skip to content

Commit

Permalink
feat(deployments): add concurrency support (#306)
Browse files Browse the repository at this point in the history
* initial commit for concurrency support

* Change ConcurrencyOptions from JSON to SingleNestedAttribute

This object in the API is pretty simple, with only one nested attribute.
It's not so flexible that we need to default to a basic JSON object.

* Validate that concurrency_limit value is >0

Per the API docs

* Fix concurrency limit return value

Concurrency limit is still configured via 'concurrency_limit' in the
request payload, but that field in the response payload is deprecated.
We instead need to retrieve it from 'global_concurrency_limit.limit'.

* Omit ConcurrencyOptions from payload if null

If ConcurrencyOptions is not configured, it should not be included in
the request payload. We get this by using pointers and checking IsNull
before attempting to add that field to the request payloads.

* Explain unused fields in GlobalConcurrencyLimit

* Mark collision_strategy as required, update docs

* Undo go.mod change

* Check if GlobalConcurrencyLimit is nil

To be safe, let's check if GlobalConcurrencyLimit is nil before
attempting to get its nested value.

Also marks ConcurrencyLimit as a pointer value for consistency.

* Change GlobalConcurrencyLimit.Limit to int64

Since we were casting it as such anyway

* Change ConcurrencyOptions to a custom type

ConcurrencyOptions was a types.Object, which made working with the
object more verbose for a resource that was a simple nested attribute.

Also, getting values from that object was causing the JSON request
payload to include escaped characters for some reason, meaning we needed
a helper function to strip them out.

This changes from types.Object to a custom Go type, allowing much easier
manipulation of the attribute - and removing the need for the custom
helper function to strip out quotes.

---------

Co-authored-by: Mitchell Nielsen <[email protected]>
  • Loading branch information
jimid27 and mitchnielsen authored Dec 5, 2024
1 parent 8a74112 commit 462bb31
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 28 deletions.
10 changes: 10 additions & 0 deletions docs/resources/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ resource "prefect_deployment" "deployment" {
### Optional

- `account_id` (String) Account ID (UUID), defaults to the account set in the provider
- `concurrency_limit` (Number) The deployment's concurrency limit.
- `concurrency_options` (Attributes) Concurrency options for the deployment. (see [below for nested schema](#nestedatt--concurrency_options))
- `description` (String) A description for the deployment.
- `enforce_parameter_schema` (Boolean) Whether or not the deployment should enforce the parameter schema.
- `entrypoint` (String) The path to the entrypoint for the workflow, relative to the path.
Expand All @@ -120,6 +122,14 @@ resource "prefect_deployment" "deployment" {
- `id` (String) Workspace ID (UUID)
- `updated` (String) Timestamp of when the resource was updated (RFC3339)

<a id="nestedatt--concurrency_options"></a>
### Nested Schema for `concurrency_options`

Required:

- `collision_strategy` (String) Enumeration of concurrency collision strategies.


<a id="nestedatt--pull_steps"></a>
### Nested Schema for `pull_steps`

Expand Down
60 changes: 43 additions & 17 deletions internal/api/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,32 @@ type Deployment struct {
AccountID uuid.UUID `json:"account_id"`
WorkspaceID uuid.UUID `json:"workspace_id"`

Description string `json:"description,omitempty"`
EnforceParameterSchema bool `json:"enforce_parameter_schema"`
Entrypoint string `json:"entrypoint"`
FlowID uuid.UUID `json:"flow_id"`
JobVariables map[string]interface{} `json:"job_variables,omitempty"`
ManifestPath string `json:"manifest_path,omitempty"`
Name string `json:"name"`
ParameterOpenAPISchema map[string]interface{} `json:"parameter_openapi_schema,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
Path string `json:"path"`
Paused bool `json:"paused"`
PullSteps []PullStep `json:"pull_steps,omitempty"`
StorageDocumentID uuid.UUID `json:"storage_document_id,omitempty"`
Tags []string `json:"tags"`
Version string `json:"version,omitempty"`
WorkPoolName string `json:"work_pool_name,omitempty"`
WorkQueueName string `json:"work_queue_name,omitempty"`
ConcurrencyLimit *int64 `json:"concurrency_limit"`
ConcurrencyOptions *ConcurrencyOptions `json:"concurrency_options,omitempty"`
Description string `json:"description,omitempty"`
EnforceParameterSchema bool `json:"enforce_parameter_schema"`
Entrypoint string `json:"entrypoint"`
FlowID uuid.UUID `json:"flow_id"`
GlobalConcurrencyLimit *GlobalConcurrencyLimit `json:"global_concurrency_limit"`
JobVariables map[string]interface{} `json:"job_variables,omitempty"`
ManifestPath string `json:"manifest_path,omitempty"`
Name string `json:"name"`
ParameterOpenAPISchema map[string]interface{} `json:"parameter_openapi_schema,omitempty"`
Parameters map[string]interface{} `json:"parameters,omitempty"`
Path string `json:"path"`
Paused bool `json:"paused"`
PullSteps []PullStep `json:"pull_steps,omitempty"`
StorageDocumentID uuid.UUID `json:"storage_document_id,omitempty"`
Tags []string `json:"tags"`
Version string `json:"version,omitempty"`
WorkPoolName string `json:"work_pool_name,omitempty"`
WorkQueueName string `json:"work_queue_name,omitempty"`
}

// DeploymentCreate is a subset of Deployment used when creating deployments.
type DeploymentCreate struct {
ConcurrencyLimit *int64 `json:"concurrency_limit,omitempty"`
ConcurrencyOptions *ConcurrencyOptions `json:"concurrency_options,omitempty"`
Description string `json:"description,omitempty"`
EnforceParameterSchema bool `json:"enforce_parameter_schema,omitempty"`
Entrypoint string `json:"entrypoint,omitempty"`
Expand All @@ -63,6 +68,8 @@ type DeploymentCreate struct {

// DeploymentUpdate is a subset of Deployment used when updating deployments.
type DeploymentUpdate struct {
ConcurrencyLimit *int64 `json:"concurrency_limit,omitempty"`
ConcurrencyOptions *ConcurrencyOptions `json:"concurrency_options,omitempty"`
Description string `json:"description,omitempty"`
EnforceParameterSchema bool `json:"enforce_parameter_schema,omitempty"`
Entrypoint string `json:"entrypoint,omitempty"`
Expand All @@ -79,6 +86,25 @@ type DeploymentUpdate struct {
WorkQueueName string `json:"work_queue_name,omitempty"`
}

// ConcurrencyOptions is a representation of the deployment concurrency options.
type ConcurrencyOptions struct {
CollisionStrategy string `json:"collision_strategy"`
}

// GlobalConcurrencyLimit is a representation of the deployment global concurrency limit.
type GlobalConcurrencyLimit struct {
Limit int64 `json:"limit"`

// These other fields exist in the response payload, but we don't make use of them at the
// moment, so we'll leave them disabled for now.
//
// BaseModel
// Active bool `json:"active"`
// Name string `json:"name"`
// ActiveSlots int `json:"active_slots"`
// SlotDecayPerSecond int `json:"slot_decay_per_second"`
}

// PullStep contains instructions for preparing your flows for a deployment run.
type PullStep struct {
// Type is the type of pull step.
Expand Down
64 changes: 62 additions & 2 deletions internal/provider/resources/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/google/uuid"
"github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes"
"github.com/hashicorp/terraform-plugin-framework-validators/int64validator"
"github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator"
"github.com/hashicorp/terraform-plugin-framework/attr"
"github.com/hashicorp/terraform-plugin-framework/diag"
Expand Down Expand Up @@ -47,6 +48,8 @@ type DeploymentResourceModel struct {
AccountID customtypes.UUIDValue `tfsdk:"account_id"`
WorkspaceID customtypes.UUIDValue `tfsdk:"workspace_id"`

ConcurrencyLimit types.Int64 `tfsdk:"concurrency_limit"`
ConcurrencyOptions *ConcurrencyOptions `tfsdk:"concurrency_options"`
Description types.String `tfsdk:"description"`
EnforceParameterSchema types.Bool `tfsdk:"enforce_parameter_schema"`
Entrypoint types.String `tfsdk:"entrypoint"`
Expand All @@ -66,6 +69,12 @@ type DeploymentResourceModel struct {
WorkQueueName types.String `tfsdk:"work_queue_name"`
}

// ConcurrentOptions represents the concurrency options for a deployment.
type ConcurrencyOptions struct {
// CollisionStrategy is the strategy to use when a deployment reaches its concurrency limit.
CollisionStrategy types.String `tfsdk:"collision_strategy"`
}

// PullStepModel represents a pull step in a deployment.
type PullStepModel struct {
// Type is the type of pull step.
Expand Down Expand Up @@ -303,6 +312,28 @@ func (r *DeploymentResource) Schema(_ context.Context, _ resource.SchemaRequest,
stringplanmodifier.RequiresReplace(),
},
},
"concurrency_limit": schema.Int64Attribute{
Description: "The deployment's concurrency limit.",
Optional: true,
Computed: true,
Validators: []validator.Int64{
int64validator.AtLeast(1),
},
},
"concurrency_options": schema.SingleNestedAttribute{
Description: "Concurrency options for the deployment.",
Optional: true,
Computed: true,
Attributes: map[string]schema.Attribute{
"collision_strategy": schema.StringAttribute{
Description: "Enumeration of concurrency collision strategies.",
Required: true,
Validators: []validator.String{
stringvalidator.OneOf("ENQUEUE", "CANCEL_NEW"),
},
},
},
},
// Pull steps are polymorphic and can have different schemas based on the pull step type.
// In the resource schema, we only make `type` required. The other attributes are needed
// based on the pull step type, which we'll validate in the resource layer.
Expand Down Expand Up @@ -468,6 +499,18 @@ func copyDeploymentToModel(ctx context.Context, deployment *api.Deployment, mode
}
model.Tags = tags

// The concurrency_limit field in the response payload is deprecated, and will always be 0
// for compatibility. The true value has been moved under `global_concurrency_limit.limit`.
if deployment.GlobalConcurrencyLimit != nil {
model.ConcurrencyLimit = types.Int64Value(deployment.GlobalConcurrencyLimit.Limit)
}

if deployment.ConcurrencyOptions != nil {
model.ConcurrencyOptions = &ConcurrencyOptions{
CollisionStrategy: types.StringValue(deployment.ConcurrencyOptions.CollisionStrategy),
}
}

pullSteps, diags := mapPullStepsAPIToTerraform(deployment.PullSteps)
diags.Append(diags...)
if diags.HasError() {
Expand Down Expand Up @@ -527,7 +570,8 @@ func (r *DeploymentResource) Create(ctx context.Context, req resource.CreateRequ
return
}

deployment, err := client.Create(ctx, api.DeploymentCreate{
createPayload := api.DeploymentCreate{
ConcurrencyLimit: plan.ConcurrencyLimit.ValueInt64Pointer(),
Description: plan.Description.ValueString(),
EnforceParameterSchema: plan.EnforceParameterSchema.ValueBool(),
Entrypoint: plan.Entrypoint.ValueString(),
Expand All @@ -545,7 +589,15 @@ func (r *DeploymentResource) Create(ctx context.Context, req resource.CreateRequ
WorkPoolName: plan.WorkPoolName.ValueString(),
WorkQueueName: plan.WorkQueueName.ValueString(),
ParameterOpenAPISchema: parameterOpenAPISchema,
})
}

if plan.ConcurrencyOptions != nil {
createPayload.ConcurrencyOptions = &api.ConcurrencyOptions{
CollisionStrategy: plan.ConcurrencyOptions.CollisionStrategy.ValueString(),
}
}

deployment, err := client.Create(ctx, createPayload)
if err != nil {
resp.Diagnostics.AddError(
"Error creating deployment",
Expand Down Expand Up @@ -694,6 +746,7 @@ func (r *DeploymentResource) Update(ctx context.Context, req resource.UpdateRequ
}

payload := api.DeploymentUpdate{
ConcurrencyLimit: model.ConcurrencyLimit.ValueInt64Pointer(),
Description: model.Description.ValueString(),
EnforceParameterSchema: model.EnforceParameterSchema.ValueBool(),
Entrypoint: model.Entrypoint.ValueString(),
Expand All @@ -708,6 +761,13 @@ func (r *DeploymentResource) Update(ctx context.Context, req resource.UpdateRequ
WorkPoolName: model.WorkPoolName.ValueString(),
WorkQueueName: model.WorkQueueName.ValueString(),
}

if !model.ConcurrencyOptions.CollisionStrategy.IsNull() {
payload.ConcurrencyOptions = &api.ConcurrencyOptions{
CollisionStrategy: model.ConcurrencyOptions.CollisionStrategy.ValueString(),
}
}

err = client.Update(ctx, deploymentID, payload)

if err != nil {
Expand Down
32 changes: 23 additions & 9 deletions internal/provider/resources/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type deploymentConfig struct {
DeploymentName string
DeploymentResourceName string

ConcurrencyLimit int
CollisionStrategy string
Description string
EnforceParameterSchema bool
Entrypoint string
Expand Down Expand Up @@ -77,6 +79,10 @@ resource "prefect_flow" "{{.FlowName}}" {
resource "prefect_deployment" "{{.DeploymentName}}" {
name = "{{.DeploymentName}}"
description = "{{.Description}}"
concurrency_limit = {{.ConcurrencyLimit}}
concurrency_options = {
collision_strategy = "{{.CollisionStrategy}}"
}
enforce_parameter_schema = {{.EnforceParameterSchema}}
entrypoint = "{{.Entrypoint}}"
flow_id = prefect_flow.{{.FlowName}}.id
Expand Down Expand Up @@ -151,6 +157,8 @@ func TestAccResource_deployment(t *testing.T) {
DeploymentResourceName: fmt.Sprintf("prefect_deployment.%s", deploymentName),
Workspace: workspace.Resource,

ConcurrencyLimit: 1,
CollisionStrategy: "ENQUEUE",
Description: "My deployment description",
EnforceParameterSchema: false,
Entrypoint: "hello_world.py:hello_world",
Expand Down Expand Up @@ -182,15 +190,17 @@ func TestAccResource_deployment(t *testing.T) {
WorkPoolName: cfgCreate.WorkPoolName,

// Configure new values to test the update.
Description: "My deployment description v2",
Entrypoint: "hello_world.py:hello_world2",
JobVariables: `{"env":{"some-key":"some-value2"}}`,
ManifestPath: "some-manifest-path2",
Parameters: "some-value2",
Path: "some-path2",
Paused: true,
Version: "v1.1.2",
WorkQueueName: "default",
ConcurrencyLimit: 2,
CollisionStrategy: "CANCEL_NEW",
Description: "My deployment description v2",
Entrypoint: "hello_world.py:hello_world2",
JobVariables: `{"env":{"some-key":"some-value2"}}`,
ManifestPath: "some-manifest-path2",
Parameters: "some-value2",
Path: "some-path2",
Paused: true,
Version: "v1.1.2",
WorkQueueName: "default",

// Enforcing parameter schema returns the following error:
//
Expand Down Expand Up @@ -252,6 +262,8 @@ func TestAccResource_deployment(t *testing.T) {
pullSteps: cfgCreate.PullSteps,
parameterOpenapiSchema: parameterOpenAPISchemaMap,
}),
resource.TestCheckResourceAttr(cfgCreate.DeploymentResourceName, "concurrency_limit", strconv.Itoa(cfgCreate.ConcurrencyLimit)),
resource.TestCheckResourceAttr(cfgCreate.DeploymentResourceName, "concurrency_options.collision_strategy", cfgCreate.CollisionStrategy),
resource.TestCheckResourceAttr(cfgCreate.DeploymentResourceName, "enforce_parameter_schema", strconv.FormatBool(cfgCreate.EnforceParameterSchema)),
resource.TestCheckResourceAttr(cfgCreate.DeploymentResourceName, "entrypoint", cfgCreate.Entrypoint),
resource.TestCheckResourceAttr(cfgCreate.DeploymentResourceName, "job_variables", cfgCreate.JobVariables),
Expand Down Expand Up @@ -279,6 +291,8 @@ func TestAccResource_deployment(t *testing.T) {
pullSteps: cfgUpdate.PullSteps,
parameterOpenapiSchema: parameterOpenAPISchemaMap,
}),
resource.TestCheckResourceAttr(cfgUpdate.DeploymentResourceName, "concurrency_limit", strconv.Itoa(cfgUpdate.ConcurrencyLimit)),
resource.TestCheckResourceAttr(cfgUpdate.DeploymentResourceName, "concurrency_options.collision_strategy", cfgUpdate.CollisionStrategy),
resource.TestCheckResourceAttr(cfgUpdate.DeploymentResourceName, "enforce_parameter_schema", strconv.FormatBool(cfgUpdate.EnforceParameterSchema)),
resource.TestCheckResourceAttr(cfgUpdate.DeploymentResourceName, "entrypoint", cfgUpdate.Entrypoint),
resource.TestCheckResourceAttr(cfgCreate.DeploymentResourceName, "job_variables", cfgUpdate.JobVariables),
Expand Down

0 comments on commit 462bb31

Please sign in to comment.