Skip to content

Commit

Permalink
stateful deployments: use TaskGroupVolumeClaim table to associate v…
Browse files Browse the repository at this point in the history
…olume requests with volume IDs (#24993)

We introduce an alternative solution to the one presented in #24960 which is
based on the state store and not previous-next allocation tracking in the
reconciler. This new solution reduces cognitive complexity of the scheduler
code at the cost of slightly more boilerplate code, but also opens up new
possibilities in the future, e.g., allowing users to explicitly "un-stick"
volumes with workloads still running.

The diagram below illustrates the new logic:

     SetVolumes()                                               upsertAllocsImpl()          
     sets ns, job                             +-----------------checks if alloc requests    
     tg in the scheduler                      v                 sticky vols and consults    
            |                  +-----------------------+        state. If there is no claim,
            |                  | TaskGroupVolumeClaim: |        it creates one.             
            |                  | - namespace           |                                    
            |                  | - jobID               |                                    
            |                  | - tg name             |                                    
            |                  | - vol ID              |                                    
            v                  | uniquely identify vol |                                    
     hasVolumes()              +----+------------------+                                    
     consults the state             |           ^                                           
     and returns true               |           |               DeleteJobTxn()              
     if there's a match <-----------+           +---------------removes the claim from      
     or if there is no                                          the state                   
     previous claim                                                                         
|                             | |                                                      |    
+-----------------------------+ +------------------------------------------------------+    
                                                                                            
           scheduler                                  state store
  • Loading branch information
pkazmierczak authored Feb 7, 2025
1 parent 3493551 commit 611452e
Show file tree
Hide file tree
Showing 16 changed files with 610 additions and 130 deletions.
1 change: 0 additions & 1 deletion api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ type Allocation struct {
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
HostVolumeIDs []string
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
Expand Down
1 change: 1 addition & 0 deletions helper/raftutil/msgtypes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions nomad/mock/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,19 @@ func HostVolume() *structs.HostVolume {
vol.HostPath = "/var/data/nomad/alloc_mounts/" + volID
return vol
}

// TaskGroupHostVolumeClaim creates a claim for a given job, alloc and host
// volume request
func TaskGroupHostVolumeClaim(job *structs.Job, alloc *structs.Allocation, dhv *structs.HostVolume) *structs.TaskGroupHostVolumeClaim {
return &structs.TaskGroupHostVolumeClaim{
ID: uuid.Generate(),
Namespace: structs.DefaultNamespace,
JobID: job.ID,
TaskGroupName: job.TaskGroups[0].Name,
AllocID: alloc.ID,
VolumeID: dhv.ID,
VolumeName: dhv.Name,
CreateIndex: 1000,
ModifyIndex: 1000,
}
}
65 changes: 51 additions & 14 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ import (
const (
tableIndex = "index"

TableNamespaces = "namespaces"
TableNodePools = "node_pools"
TableServiceRegistrations = "service_registrations"
TableVariables = "variables"
TableVariablesQuotas = "variables_quota"
TableRootKeys = "root_keys"
TableACLRoles = "acl_roles"
TableACLAuthMethods = "acl_auth_methods"
TableACLBindingRules = "acl_binding_rules"
TableAllocs = "allocs"
TableJobSubmission = "job_submission"
TableHostVolumes = "host_volumes"
TableCSIVolumes = "csi_volumes"
TableCSIPlugins = "csi_plugins"
TableNamespaces = "namespaces"
TableNodePools = "node_pools"
TableServiceRegistrations = "service_registrations"
TableVariables = "variables"
TableVariablesQuotas = "variables_quota"
TableRootKeys = "root_keys"
TableACLRoles = "acl_roles"
TableACLAuthMethods = "acl_auth_methods"
TableACLBindingRules = "acl_binding_rules"
TableAllocs = "allocs"
TableJobSubmission = "job_submission"
TableHostVolumes = "host_volumes"
TableCSIVolumes = "csi_volumes"
TableCSIPlugins = "csi_plugins"
TableTaskGroupHostVolumeClaim = "task_volume"
)

const (
Expand Down Expand Up @@ -102,6 +103,7 @@ func init() {
aclAuthMethodsTableSchema,
bindingRulesTableSchema,
hostVolumeTableSchema,
taskGroupHostVolumeClaimSchema,
}...)
}

Expand Down Expand Up @@ -1706,3 +1708,38 @@ func hostVolumeTableSchema() *memdb.TableSchema {
},
}
}

func taskGroupHostVolumeClaimSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: TableTaskGroupHostVolumeClaim,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,

// Use a compound index so the combination of (Namespace, JobID, TaskGroupName,
// VolumeID) is uniquely identifying
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},

&memdb.StringFieldIndex{
Field: "JobID",
},

&memdb.StringFieldIndex{
Field: "TaskGroupName",
},

&memdb.StringFieldIndex{
Field: "VolumeID",
},
},
},
},
},
}
}
62 changes: 59 additions & 3 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/go-set/v3"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/lang"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -2025,6 +2026,12 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil {
return fmt.Errorf("deleting job scaling events failed: %v", err)
}

// Delete task group volume claims
if err = s.deleteTaskGroupHostVolumeClaim(index, txn, namespace, jobID); err != nil {
return fmt.Errorf("deleting job volume claims failed: %v", err)
}

if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
Expand Down Expand Up @@ -4112,11 +4119,11 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
}

// Issue https://github.com/hashicorp/nomad/issues/2583 uncovered
// the a race between a forced garbage collection and the scheduler
// a race between a forced garbage collection and the scheduler
// marking an allocation as terminal. The issue is that the
// allocation from the scheduler has its job normalized and the FSM
// will only denormalize if the allocation is not terminal. However
// if the allocation is garbage collected, that will result in a
// will only denormalize if the allocation is not terminal. However
// if the allocation is garbage collected, that will result in an
// allocation being upserted for the first time without a job
// attached. By returning an error here, it will cause the FSM to
// error, causing the plan_apply to error and thus causing the
Expand All @@ -4125,6 +4132,55 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
if alloc.Job == nil {
return fmt.Errorf("attempting to upsert allocation %q without a job", alloc.ID)
}

// Check if the alloc requires sticky volumes. If yes, find a node
// that has the right volume and update the task group volume
// claims table
for _, tg := range alloc.Job.TaskGroups {
for _, v := range tg.Volumes {
if !v.Sticky {
continue
}
sv := &structs.TaskGroupHostVolumeClaim{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
JobID: alloc.JobID,
TaskGroupName: tg.Name,
AllocID: alloc.ID,
VolumeName: v.Source,
}

allocNode, err := s.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}

// since there's no existing claim, find a volume and register a claim
for _, v := range allocNode.HostVolumes {
if v.Name != sv.VolumeName {
continue
}

sv.VolumeID = v.ID

// has this volume been claimed already?
existingClaim, err := s.GetTaskGroupHostVolumeClaim(nil, sv.Namespace, sv.JobID, sv.TaskGroupName, v.ID)
if err != nil {
return err
}

// if the volume has already been claimed, we don't have to do anything. The
// feasibility checker in the scheduler will verify alloc placement.
if existingClaim != nil {
continue
}

if err := s.upsertTaskGroupHostVolumeClaimImpl(index, sv, txn); err != nil {
return err
}
}
}
}
} else {
alloc.CreateIndex = exist.CreateIndex
alloc.ModifyIndex = index
Expand Down
138 changes: 138 additions & 0 deletions nomad/state/state_store_task_group_volume_claims.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package state

import (
"fmt"

"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)

// UpsertTaskGroupHostVolumeClaim is used to upsert claims into the state store.
// This method is only used in unit tests.
func (s *StateStore) UpsertTaskGroupHostVolumeClaim(msgType structs.MessageType, index uint64, claim *structs.TaskGroupHostVolumeClaim) error {
// Grab a write transaction.
txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()
if err := s.upsertTaskGroupHostVolumeClaimImpl(index, claim, txn); err != nil {
return err
}

return txn.Commit()
}

// upsertTaskGroupHostVolumeClaimImpl is used to insert a task group volume claim into
// the state store.
func (s *StateStore) upsertTaskGroupHostVolumeClaimImpl(
index uint64, claim *structs.TaskGroupHostVolumeClaim, txn *txn) error {

existingRaw, err := txn.First(TableTaskGroupHostVolumeClaim, indexID, claim.Namespace, claim.JobID, claim.TaskGroupName, claim.VolumeID)
if err != nil {
return fmt.Errorf("Task group volume association lookup failed: %v", err)
}

var existing *structs.TaskGroupHostVolumeClaim
if existingRaw != nil {
existing = existingRaw.(*structs.TaskGroupHostVolumeClaim)
}

if existing != nil {
// do allocation ID and volume ID match?
if existing.ClaimedByAlloc(claim) {
return nil
}

claim.CreateIndex = existing.CreateIndex
claim.ModifyIndex = index
} else {
claim.CreateIndex = index
claim.ModifyIndex = index
}

// Insert the claim into the table.
if err := txn.Insert(TableTaskGroupHostVolumeClaim, claim); err != nil {
return fmt.Errorf("Task group volume claim insert failed: %v", err)
}

// Perform the index table update to mark the new insert.
if err := txn.Insert(tableIndex, &IndexEntry{TableTaskGroupHostVolumeClaim, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return nil
}

// GetTaskGroupHostVolumeClaim returns a volume claim that matches the namespace,
// job id and task group name (there can be only one)
func (s *StateStore) GetTaskGroupHostVolumeClaim(ws memdb.WatchSet, namespace, jobID, taskGroupName, volumeID string) (*structs.TaskGroupHostVolumeClaim, error) {
txn := s.db.ReadTxn()

watchCh, existing, err := txn.FirstWatch(TableTaskGroupHostVolumeClaim, indexID, namespace, jobID, taskGroupName, volumeID)
if err != nil {
return nil, fmt.Errorf("Task group volume claim lookup failed: %v", err)
}
ws.Add(watchCh)

if existing != nil {
return existing.(*structs.TaskGroupHostVolumeClaim), nil
}

return nil, nil
}

// GetTaskGroupHostVolumeClaims returns all volume claims
func (s *StateStore) GetTaskGroupHostVolumeClaims(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get(TableTaskGroupHostVolumeClaim, indexID)
if err != nil {
return nil, fmt.Errorf("Task group volume claim lookup failed: %v", err)
}
ws.Add(iter.WatchCh())

return iter, nil
}

// GetTaskGroupHostVolumeClaimsForTaskGroup returns all volume claims for a given
// task group
func (s *StateStore) GetTaskGroupHostVolumeClaimsForTaskGroup(ws memdb.WatchSet, ns, jobID, tg string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get(TableTaskGroupHostVolumeClaim, indexID)
if err != nil {
return nil, fmt.Errorf("Task group volume claim lookup failed: %v", err)
}
ws.Add(iter.WatchCh())

// Filter out by ns, jobID and tg
filter := memdb.NewFilterIterator(iter, func(raw interface{}) bool {
claim, ok := raw.(*structs.TaskGroupHostVolumeClaim)
if !ok {
return true
}
return claim.Namespace != ns || claim.JobID != jobID || claim.TaskGroupName != tg
})

return filter, nil
}

// deleteTaskGroupHostVolumeClaim deletes all claims for a given namespace and job ID
func (s *StateStore) deleteTaskGroupHostVolumeClaim(index uint64, txn *txn, namespace, jobID string) error {
iter, err := txn.Get(TableTaskGroupHostVolumeClaim, indexID)
if err != nil {
return fmt.Errorf("Task group volume claim lookup failed: %v", err)
}

for raw := iter.Next(); raw != nil; raw = iter.Next() {
claim := raw.(*structs.TaskGroupHostVolumeClaim)
if claim.JobID == jobID && claim.Namespace == namespace {
if err := txn.Delete(TableTaskGroupHostVolumeClaim, claim); err != nil {
return fmt.Errorf("Task group volume claim deletion failed: %v", err)
}
}
}

return nil
}
Loading

0 comments on commit 611452e

Please sign in to comment.