Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stateful deployments: use TaskGroupVolumeClaim table to associate volume requests with volume IDs #24993

Merged
merged 28 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
47e8969
taskVolumeAssignmentSchema
pkazmierczak Jan 30, 2025
7f7fecd
state store methods and struct definition
pkazmierczak Jan 30, 2025
b0cc34f
wip
pkazmierczak Jan 31, 2025
22f5827
clean up
pkazmierczak Jan 31, 2025
42a727a
a few missing pieces
pkazmierczak Jan 31, 2025
727e564
Tim's comments
pkazmierczak Feb 3, 2025
3cb76b5
better schema?
pkazmierczak Feb 3, 2025
088d525
wip
pkazmierczak Feb 3, 2025
d0f5092
remove host volume IDs field from allocation
pkazmierczak Feb 4, 2025
0a11627
remove from api
pkazmierczak Feb 4, 2025
554dda9
working prototype
pkazmierczak Feb 4, 2025
10599dc
clean-ups and Tim's comments
pkazmierczak Feb 4, 2025
06b3f98
feasibility test correction
pkazmierczak Feb 5, 2025
0e7ae55
basic test
pkazmierczak Feb 5, 2025
acecaa6
msgtypes
pkazmierczak Feb 5, 2025
13a257c
upsertAllocs test
pkazmierczak Feb 5, 2025
a6b5777
fix claim deletion
pkazmierczak Feb 5, 2025
8a0cec1
refactor the logic in hasVolumes
pkazmierczak Feb 5, 2025
a516ba7
simplify conditional
pkazmierczak Feb 5, 2025
4261a48
better test
pkazmierczak Feb 6, 2025
240d1dd
renamed to emphasize it's about host volumes
pkazmierczak Feb 6, 2025
6b68667
Tim's comment
pkazmierczak Feb 6, 2025
aa56c30
keep track of "used" claims
pkazmierczak Feb 6, 2025
408212c
remove TaskGroupHostVolumeClaimRegisterRequestType msg
pkazmierczak Feb 6, 2025
e90b923
test for the upsert method
pkazmierczak Feb 7, 2025
34c1bf5
update feasibility checker to only fetch task group claims
pkazmierczak Feb 7, 2025
34f139e
fix feasibility test
pkazmierczak Feb 7, 2025
eb29282
Tim's comments
pkazmierczak Feb 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ type Allocation struct {
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
HostVolumeIDs []string
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
Expand Down
17 changes: 17 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyHostVolumeRegister(msgType, buf[1:], log.Index)
case structs.HostVolumeDeleteRequestType:
return n.applyHostVolumeDelete(msgType, buf[1:], log.Index)
// case structs.TaskGroupVolumeClaimDeleteRequestType:
// return n.applyTaskVolumeClaimDelete(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -2450,6 +2452,21 @@ func (n *nomadFSM) applyHostVolumeDelete(msgType structs.MessageType, buf []byte
return nil
}

// func (n *nomadFSM) applyTaskVolumeClaimDelete(buf []byte, index uint64) interface{} {
// defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_task_group_volume_claim_delete"}, time.Now())

// var req structs.TaskGroupVolumeClaimDeleteRequest
// if err := structs.Decode(buf, &req); err != nil {
// panic(fmt.Errorf("failed to decode request: %v", err))
// }

// if err := n.state.DeleteTaskGroupVolumeClaim(index, req.ClaimID); err != nil {
// n.logger.Error("DeleteTaskGroupVolumeClaim failed", "error", err)
// return err
// }
// return nil
// }

func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"nomad", "fsm", "persist"}, time.Now())
// Register the nodes
Expand Down
42 changes: 42 additions & 0 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
TableHostVolumes = "host_volumes"
TableCSIVolumes = "csi_volumes"
TableCSIPlugins = "csi_plugins"
TableTaskGroupVolumeClaim = "task_volume"
)

const (
Expand All @@ -45,6 +46,7 @@ const (
indexSigningKey = "signing_key"
indexAuthMethod = "auth_method"
indexNodePool = "node_pool"
indexVolumeID = "volume_id"
)

var (
Expand Down Expand Up @@ -102,6 +104,7 @@ func init() {
aclAuthMethodsTableSchema,
bindingRulesTableSchema,
hostVolumeTableSchema,
taskVolumeClaimSchema,
}...)
}

Expand Down Expand Up @@ -1706,3 +1709,42 @@ func hostVolumeTableSchema() *memdb.TableSchema {
},
}
}

func taskVolumeClaimSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: TableTaskGroupVolumeClaim,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
AllowMissing: false,
Unique: true,

// Use a compound index so the combination of (Namespace, JobID, TaskGroupName)
// is uniquely identifying
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Namespace",
},

&memdb.StringFieldIndex{
Field: "JobID",
},

&memdb.StringFieldIndex{
Field: "TaskGroupName",
},
},
},
},
indexVolumeID: {
Name: indexVolumeID,
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "VolumeID",
},
},
},
}
}
56 changes: 56 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,14 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil {
return fmt.Errorf("deleting job scaling events failed: %v", err)
}

// Delete task group volume claims
for _, tg := range job.TaskGroups {
if _, err = txn.DeleteAll(TableTaskGroupVolumeClaim, indexID, namespace, jobID, tg.Name); err != nil {
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("deleting job volume claims failed: %v", err)
}
}

if err := txn.Insert("index", &IndexEntry{"scaling_event", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
Expand Down Expand Up @@ -4125,6 +4133,54 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
if alloc.Job == nil {
return fmt.Errorf("attempting to upsert allocation %q without a job", alloc.ID)
}

// Check if the alloc requires sticky volumes. If yes, find a node
// that has the right volume and update the task group volume
// claims table
stickyVolumes := []*structs.TaskGroupVolumeClaim{}
for _, tg := range alloc.Job.TaskGroups {
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
for _, v := range tg.Volumes {
if !v.Sticky {
continue
}
stickyVolumes = append(stickyVolumes, &structs.TaskGroupVolumeClaim{
Namespace: alloc.Namespace,
JobID: alloc.JobID,
TaskGroupName: tg.Name,
AllocID: alloc.ID,
VolumeName: v.Source,
})
}
}

if len(stickyVolumes) > 0 {
for _, sv := range stickyVolumes {
// has this volume been claimed already?
existingClaim, err := s.GetTaskGroupVolumeClaim(nil, sv.Namespace, sv.JobID, sv.TaskGroupName)
if err != nil {
return err
}

if existingClaim == nil {
allocNode, err := s.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}

// since there's no existing claim, find a volume and register a claim
for _, v := range allocNode.HostVolumes {
if v.Name != sv.VolumeName {
continue
}

sv.VolumeID = v.ID
if err := s.upsertTaskGroupVolumeClaimImpl(index, sv, txn); err != nil {
return err
}
}
}
}
}
} else {
alloc.CreateIndex = exist.CreateIndex
alloc.ModifyIndex = index
Expand Down
132 changes: 132 additions & 0 deletions nomad/state/state_store_task_group_volume_claims.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package state

import (
"errors"
"fmt"
"time"

"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
)

// upsertTaskGroupVolumeClaimImpl is used to insert a task group volume claim into
// the state store.
func (s *StateStore) upsertTaskGroupVolumeClaimImpl(
index uint64, claim *structs.TaskGroupVolumeClaim, txn *txn) error {

existingRaw, err := txn.First(TableTaskGroupVolumeClaim, indexID, claim.Namespace, claim.JobID, claim.TaskGroupName)
if err != nil {
return fmt.Errorf("Task group volume association lookup failed: %v", err)
}

var existing *structs.TaskGroupVolumeClaim
if existingRaw != nil {
existing = existingRaw.(*structs.TaskGroupVolumeClaim)
}

now := time.Now().UTC()

if existing != nil {
// do allocation ID and volume ID match?
if existing.ClaimedByAlloc(claim) {
return nil
}

claim.CreateIndex = existing.CreateIndex
claim.ModifyIndex = index
claim.CreateTime = existing.CreateTime
claim.ModifyTime = now
} else {
claim.CreateIndex = index
claim.ModifyIndex = index
claim.CreateTime = now
claim.ModifyTime = now
}

// Insert the claim into the table.
if err := txn.Insert(TableTaskGroupVolumeClaim, claim); err != nil {
return fmt.Errorf("Task group volume claim insert failed: %v", err)
}

// Perform the index table update to mark the new insert.
if err := txn.Insert(tableIndex, &IndexEntry{TableTaskGroupVolumeClaim, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return nil
}

// DeleteTaskGroupVolumeClaim is responsible for deleting volume claims.
func (s *StateStore) DeleteTaskGroupVolumeClaim(index uint64, namespace, jobID, taskGroupName string) error {
txn := s.db.WriteTxnMsgT(structs.TaskGroupVolumeClaimDeleteRequestType, index)
defer txn.Abort()

existing, err := txn.First(TableTaskGroupVolumeClaim, indexID, namespace, jobID, taskGroupName)
if err != nil {
return fmt.Errorf("Task group volume claim lookup failed: %v", err)
}
if existing == nil {
return errors.New("ACL binding rule not found")
}

// Delete the existing entry from the table.
if err := txn.Delete(TableTaskGroupVolumeClaim, existing); err != nil {
return fmt.Errorf("Task group volume claim deletion failed: %v", err)
}

// Update the index table to indicate an update has occurred.
if err := txn.Insert(tableIndex, &IndexEntry{TableTaskGroupVolumeClaim, index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return txn.Commit()
}

// GetTaskGroupVolumeClaims returns an iterator that contains all task group
// volume associations stored within state.
func (s *StateStore) GetTaskGroupVolumeClaims(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get(TableTaskGroupVolumeClaim, indexID)
if err != nil {
return nil, fmt.Errorf("Task group volume claims lookup failed: %v", err)
}
ws.Add(iter.WatchCh())

return iter, nil
}

// GetTaskGroupVolumeClaimsByVolumeID gets an iterator that contains all task
// group volume claims that claim a particular volume ID
func (s *StateStore) GetTaskGroupVolumeClaimsByVolumeID(ws memdb.WatchSet, volID string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()

iter, err := txn.Get(TableTaskGroupVolumeClaim, indexVolumeID, volID)
if err != nil {
return nil, fmt.Errorf("Task group volume claims lookup failed: %v", err)
}
ws.Add(iter.WatchCh())

return iter, nil
}

// GetTaskGroupVolumeClaim returns a volume claim that matches the namespace,
// job id and task group name (there can be only one)
func (s *StateStore) GetTaskGroupVolumeClaim(ws memdb.WatchSet, namespace, jobID, taskGroupName string) (*structs.TaskGroupVolumeClaim, error) {
txn := s.db.ReadTxn()

watchCh, existing, err := txn.FirstWatch(TableTaskGroupVolumeClaim, indexID, namespace, jobID, taskGroupName)
if err != nil {
return nil, fmt.Errorf("Task group volume claim lookup failed: %v", err)
}
ws.Add(watchCh)

if existing != nil {
return existing.(*structs.TaskGroupVolumeClaim), nil
}

return nil, nil
}
9 changes: 9 additions & 0 deletions nomad/structs/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,12 @@ type HostVolumeListResponse struct {
Volumes []*HostVolumeStub
QueryMeta
}

type TaskGroupVolumeClaimDeleteRequest struct {
ClaimID string
WriteRequest
}

type TaskGroupVolumeClaimDeleteResponse struct {
WriteMeta
}
Loading
Loading