diff --git a/Documentation/unit-files-and-scheduling.md b/Documentation/unit-files-and-scheduling.md index 34f5e90fd..6e97bd8c2 100644 --- a/Documentation/unit-files-and-scheduling.md +++ b/Documentation/unit-files-and-scheduling.md @@ -23,6 +23,7 @@ Note that these requirements are derived directly from systemd, with the only ex | `MachineMetadata` | Limit eligible machines to those with this specific metadata. | | `Conflicts` | Prevent a unit from being collocated with other units using glob-matching on the other unit names. | | `Global` | Schedule this unit on all agents in the cluster. A unit is considered invalid if options other than `MachineMetadata` are provided alongside `Global=true`. | +| `Replaces` | Schedule a specified unit on another machine. A unit is considered invalid if options `Global` or `Conflicts` are provided alongside `Replaces=`. A circular replacement between multiple units is not allowed. | See [more information][unit-scheduling] on these parameters and how they impact scheduling decisions. diff --git a/agent/reconcile_test.go b/agent/reconcile_test.go index 7e619b9ff..8fcad27ad 100644 --- a/agent/reconcile_test.go +++ b/agent/reconcile_test.go @@ -386,6 +386,30 @@ func TestAbleToRun(t *testing.T) { job: newTestJobWithXFleetValues(t, "Conflicts=ping.service"), want: false, }, + + // no replaces found + { + dState: &AgentState{ + MState: &machine.MachineState{ID: "123"}, + Units: map[string]*job.Unit{ + "ping.service": &job.Unit{Name: "ping.service"}, + }, + }, + job: newTestJobWithXFleetValues(t, "Replaces=pong.service"), + want: true, + }, + + // replaces found + { + dState: &AgentState{ + MState: &machine.MachineState{ID: "123"}, + Units: map[string]*job.Unit{ + "ping.service": &job.Unit{Name: "ping.service"}, + }, + }, + job: newTestJobWithXFleetValues(t, "Replaces=ping.service"), + want: false, + }, } for i, tt := range tests { diff --git a/agent/state.go b/agent/state.go index f4c3d1e94..b1108780a 100644 --- a/agent/state.go +++ b/agent/state.go @@ -66,6 +66,48 @@ func (as *AgentState) hasConflict(pUnitName string, pConflicts []string) (found return } +// hasReplace determines whether there are any known replaces with the given Unit +func (as *AgentState) hasReplace(pUnitName string, pReplaces []string) (found bool, replace string) { + for _, eUnit := range as.Units { + foundPrepl := false + foundErepl := false + retStr := "" + + if pUnitName == eUnit.Name { + continue + } + + for _, pReplace := range pReplaces { + if globMatches(pReplace, eUnit.Name) { + foundPrepl = true + retStr = eUnit.Name + break + } + } + + for _, eReplace := range eUnit.Replaces() { + if globMatches(eReplace, pUnitName) { + foundErepl = true + retStr = eUnit.Name + break + } + } + + // Only 1 of 2 matches must be found. If both matches are found, + // it means it's a circular replace situation, which could result in + // an infinite loop. So ignore such replace options. + if (foundPrepl && foundErepl) || (!foundPrepl && !foundErepl) { + continue + } else { + found = true + replace = retStr + return + } + } + + return +} + func globMatches(pattern, target string) bool { matched, err := path.Match(pattern, target) if err != nil { @@ -81,6 +123,7 @@ func globMatches(pattern, target string) bool { // - Agent must have all of the Job's required metadata (if any) // - Agent must have all required Peers of the Job scheduled locally (if any) // - Job must not conflict with any other Units scheduled to the agent +// - Job must specially handle replaced units to be rescheduled func (as *AgentState) AbleToRun(j *job.Job) (bool, string) { if tgt, ok := j.RequiredTarget(); ok && !as.MState.MatchID(tgt) { return false, fmt.Sprintf("agent ID %q does not match required %q", as.MState.ID, tgt) @@ -106,5 +149,11 @@ func (as *AgentState) AbleToRun(j *job.Job) (bool, string) { return false, fmt.Sprintf("found conflict with locally-scheduled Unit(%s)", cJobName) } + // Handle Replace option specially, by returning a special string + // "jobreschedule" as reason. + if cExists, _ := as.hasReplace(j.Name, j.Replaces()); cExists { + return false, job.JobReschedule + } + return true, "" } diff --git a/agent/state_test.go b/agent/state_test.go index 1295cebb4..8afc92316 100644 --- a/agent/state_test.go +++ b/agent/state_test.go @@ -98,6 +98,83 @@ func TestHasConflicts(t *testing.T) { } } +func TestHasReplaces(t *testing.T) { + tests := []struct { + cState *AgentState + job *job.Job + want bool + replace string + }{ + // empty current state causes no replaces + { + cState: NewAgentState(&machine.MachineState{ID: "XXX"}), + job: &job.Job{Name: "foo.service", Unit: fleetUnit(t, "Replaces=bar.service")}, + want: false, + }, + + // existing Job has replace with new Job + { + cState: &AgentState{ + MState: &machine.MachineState{ID: "XXX"}, + Units: map[string]*job.Unit{ + "bar.service": &job.Unit{ + Name: "bar.service", + Unit: fleetUnit(t, "Replaces=foo.service"), + }, + }, + }, + job: &job.Job{Name: "foo.service", Unit: unit.UnitFile{}}, + want: true, + replace: "bar.service", + }, + + // new Job has replace with existing job + { + cState: &AgentState{ + MState: &machine.MachineState{ID: "XXX"}, + Units: map[string]*job.Unit{ + "bar.service": &job.Unit{ + Name: "bar.service", + Unit: unit.UnitFile{}, + }, + }, + }, + job: &job.Job{Name: "foo.service", Unit: fleetUnit(t, "Replaces=bar.service")}, + want: true, + replace: "bar.service", + }, + + // both jobs have replace with each other: it should fail + { + cState: &AgentState{ + MState: &machine.MachineState{ID: "XXX"}, + Units: map[string]*job.Unit{ + "bar.service": &job.Unit{ + Name: "bar.service", + Unit: fleetUnit(t, "Replaces=foo.service"), + }, + }, + }, + job: &job.Job{Name: "foo.service", Unit: fleetUnit(t, "Replaces=bar.service")}, + want: false, + replace: "bar.service", + }, + } + + for i, tt := range tests { + got, replace := tt.cState.hasReplace(tt.job.Name, tt.job.Replaces()) + if got != tt.want { + var msg string + if tt.want == true { + msg = fmt.Sprintf("expected no replace, found replace with Job %q", replace) + } else { + msg = fmt.Sprintf("expected replace with Job %q, got none", replace) + } + t.Errorf("case %d: %s", i, msg) + } + } +} + func TestGlobMatches(t *testing.T) { tests := []struct { pattern string diff --git a/api/units.go b/api/units.go index 6f37f71f8..1f0b59e93 100644 --- a/api/units.go +++ b/api/units.go @@ -223,6 +223,7 @@ func ValidateOptions(opts []*schema.UnitOption) error { Unit: *uf, } conflicts := pkg.NewUnsafeSet(j.Conflicts()...) + replaces := pkg.NewUnsafeSet(j.Replaces()...) peers := pkg.NewUnsafeSet(j.Peers()...) for _, peer := range peers.Values() { for _, conflict := range conflicts.Values() { @@ -231,9 +232,16 @@ func ValidateOptions(opts []*schema.UnitOption) error { return fmt.Errorf("unresolvable requirements: peer %q matches conflict %q", peer, conflict) } } + for _, replace := range replaces.Values() { + matched, _ := path.Match(replace, peer) + if matched { + return fmt.Errorf("unresolvable requirements: peer %q matches replace %q", peer, replace) + } + } } hasPeers := peers.Length() != 0 hasConflicts := conflicts.Length() != 0 + hasReplaces := replaces.Length() != 0 _, hasReqTarget := j.RequiredTarget() u := &job.Unit{ Unit: *uf, @@ -247,10 +255,16 @@ func ValidateOptions(opts []*schema.UnitOption) error { return errors.New("MachineID cannot be used with Conflicts") case hasReqTarget && isGlobal: return errors.New("MachineID cannot be used with Global") + case hasReqTarget && hasReplaces: + return errors.New("MachineID cannot be used with Replaces") case isGlobal && hasPeers: return errors.New("Global cannot be used with Peers") case isGlobal && hasConflicts: return errors.New("Global cannot be used with Conflicts") + case isGlobal && hasReplaces: + return errors.New("Global cannot be used with Replaces") + case hasConflicts && hasReplaces: + return errors.New("Conflicts cannot be used with Replaces") } return nil diff --git a/api/units_test.go b/api/units_test.go index 85a341fd8..820aeac5e 100644 --- a/api/units_test.go +++ b/api/units_test.go @@ -454,6 +454,14 @@ func makeConflictUO(name string) *schema.UnitOption { } } +func makeReplaceUO(name string) *schema.UnitOption { + return &schema.UnitOption{ + Section: "X-Fleet", + Name: "Replaces", + Value: name, + } +} + func makePeerUO(name string) *schema.UnitOption { return &schema.UnitOption{ Section: "X-Fleet", @@ -543,6 +551,39 @@ func TestValidateOptions(t *testing.T) { }, false, }, + // Replaces + // non-overlapping replace is fine + { + []*schema.UnitOption{ + makeReplaceUO("foo.service"), + makeReplaceUO("bar.service"), + }, + true, + }, + { + []*schema.UnitOption{ + makeReplaceUO("foo.service"), + makePeerUO("bar.service"), + }, + true, + }, + // replace + conflict is not good + { + []*schema.UnitOption{ + makeReplaceUO("foo.service"), + makeConflictUO("bar.service"), + }, + false, + }, + // circular replaces are not good + { + []*schema.UnitOption{ + makeReplaceUO("foo.service"), + makeReplaceUO("bar.service"), + makePeerUO("bar.service"), + }, + false, + }, // MachineID is fine by itself { []*schema.UnitOption{ @@ -576,7 +617,25 @@ func TestValidateOptions(t *testing.T) { }, { []*schema.UnitOption{ - makeIDUO("zyxwvutsr"), makeConflictUO("foo.service"), makeConflictUO("bar.service"), + makeIDUO("zyxwvutsr"), + makeConflictUO("foo.service"), + makeConflictUO("bar.service"), + }, + false, + }, + // MachineID with Replaces no good + { + []*schema.UnitOption{ + makeIDUO("abcdefghi"), + makeReplaceUO("bar.service"), + }, + false, + }, + { + []*schema.UnitOption{ + makeIDUO("zyxwvutsr"), + makeReplaceUO("foo.service"), + makeReplaceUO("bar.service"), }, false, }, @@ -649,6 +708,18 @@ func TestValidateOptions(t *testing.T) { }, false, }, + // Global with Replaces no good + { + []*schema.UnitOption{ + &schema.UnitOption{ + Section: "X-Fleet", + Name: "Global", + Value: "true", + }, + makeReplaceUO("foo.service"), + }, + false, + }, } for i, tt := range testCases { err := ValidateOptions(tt.opts) diff --git a/engine/reconciler.go b/engine/reconciler.go index 4836bc2b1..6552d821a 100644 --- a/engine/reconciler.go +++ b/engine/reconciler.go @@ -110,10 +110,15 @@ func (r *Reconciler) calculateClusterTasks(clust *clusterState, stopchan chan st } var able bool - if able, reason = as.AbleToRun(j); !able { + var ableReason string + if able, ableReason = as.AbleToRun(j); !able { unschedule = true - reason = fmt.Sprintf("target Machine(%s) unable to run unit", j.TargetMachineID) - metrics.ReportEngineReconcileFailure(metrics.RunFailure) + if ableReason == job.JobReschedule { + reason = ableReason + } else { + reason = fmt.Sprintf("target Machine(%s) unable to run unit", j.TargetMachineID) + metrics.ReportEngineReconcileFailure(metrics.RunFailure) + } return } diff --git a/fleetctl/fleetctl_test.go b/fleetctl/fleetctl_test.go index ea6ff075d..b23b2558e 100644 --- a/fleetctl/fleetctl_test.go +++ b/fleetctl/fleetctl_test.go @@ -327,6 +327,24 @@ MachineOf=zxcvq`), Global=true Conflicts=bar`), }, + { + "foo.service", + newUnitFile(t, `[X-Fleet] +Global=true +Replaces=bar`), + }, + { + "foo.service", + newUnitFile(t, `[X-Fleet] +Conflicts=bar +Replaces=bar`), + }, + { + "foo.service", + newUnitFile(t, `[X-Fleet] +MachineOf=abcd +Replaces=abcd`), + }, } for i, tt = range testCases { un = tt.name diff --git a/functional/fixtures/units/replace.0.service b/functional/fixtures/units/replace.0.service new file mode 100644 index 000000000..94e06cf90 --- /dev/null +++ b/functional/fixtures/units/replace.0.service @@ -0,0 +1,5 @@ +[Unit] +Description=Test Unit + +[Service] +ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done" diff --git a/functional/fixtures/units/replace.1.service b/functional/fixtures/units/replace.1.service new file mode 100644 index 000000000..5f1d5a448 --- /dev/null +++ b/functional/fixtures/units/replace.1.service @@ -0,0 +1,8 @@ +[Unit] +Description=Test Unit + +[Service] +ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done" + +[X-Fleet] +Replaces=replace.0.service diff --git a/functional/scheduling_test.go b/functional/scheduling_test.go index a510b977a..5f71671e4 100644 --- a/functional/scheduling_test.go +++ b/functional/scheduling_test.go @@ -17,6 +17,7 @@ package functional import ( "fmt" "os" + "path" "path/filepath" "strings" "testing" @@ -320,6 +321,145 @@ func TestScheduleOneWayConflict(t *testing.T) { } +// TestScheduleReplace starts 1 unit, followed by starting another unit +// that replaces the 1st unit. Then it verifies that the 2 units are +// started on different machines. +func TestScheduleReplace(t *testing.T) { + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + t.Fatal(err) + } + defer cluster.Destroy(t) + + // Start with a simple three-node cluster + members, err := platform.CreateNClusterMembers(cluster, 2) + if err != nil { + t.Fatal(err) + } + m0 := members[0] + if _, err := cluster.WaitForNMachines(m0, 2); err != nil { + t.Fatal(err) + } + + // Start a unit without Replaces + uNames := []string{ + "fixtures/units/replace.0.service", + "fixtures/units/replace.1.service", + } + if stdout, stderr, err := cluster.Fleetctl(m0, "start", "--no-block", uNames[0]); err != nil { + t.Fatalf("Failed starting unit %s: \nstdout: %s\nstderr: %s\nerr: %v", uNames[0], stdout, stderr, err) + } + + active, err := cluster.WaitForNActiveUnits(m0, 1) + if err != nil { + t.Fatal(err) + } + _, err = util.ActiveToSingleStates(active) + if err != nil { + t.Fatal(err) + } + + // Start a unit that replaces the former one, replace.0.service + if stdout, stderr, err := cluster.Fleetctl(m0, "start", "--no-block", uNames[1]); err != nil { + t.Fatalf("Failed starting unit %s: \nstdout: %s\nstderr: %s\nerr: %v", uNames[1], stdout, stderr, err) + } + + // Check that both units should show up + stdout, _, err := cluster.Fleetctl(m0, "list-unit-files", "--no-legend") + if err != nil { + t.Fatalf("Failed to run list-unit-files: %v", err) + } + units := strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != 2 { + t.Fatalf("Did not find two units in cluster: \n%s", stdout) + } + active, err = cluster.WaitForNActiveUnits(m0, 2) + if err != nil { + t.Fatal(err) + } + states, err := util.ActiveToSingleStates(active) + if err != nil { + t.Fatal(err) + } + + // Check that the unit 1 is located on a different machine from that of unit 0 + nUnits := 2 + uNameBase := make([]string, nUnits) + machs := make([]string, nUnits) + for i, uName := range uNames { + uNameBase[i] = path.Base(uName) + machs[i] = states[uNameBase[i]].Machine + } + if machs[0] == machs[1] { + t.Fatalf("machine for %s is %s, the same as that of %s.", uNameBase[0], machs[0], uNameBase[1]) + } + + // Check that circular replaces end up with 1 launched unit. + // First of all, stop the existing unit replace.0.service. + if stdout, stderr, err := cluster.Fleetctl(m0, "destroy", uNameBase[0]); err != nil { + t.Fatalf("Failed to destroy unit %s: \nstdout: %s\nstderr: %s\nerr: %v", + uNameBase[0], stdout, stderr, err) + } + + // Generate a new service 0 derived by a fixture, make the new service + // replace service 1, and store it under /tmp. + uName0tmp := path.Join("/tmp", uNameBase[0]) + err = util.GenNewFleetService(uName0tmp, uNames[1], + "Replaces=replace.1.service", "Replaces=replace.0.service") + if err != nil { + t.Fatalf("Failed to generate a temp fleet service: %v", err) + } + + // Start replace.0 unit that replaces replace.1.service, + // then fleetctl list-unit-files should show only return 1 launched unit. + // Note that we still need to run list-units once, before doing + // list-unit-files, for reliable tests. + stdout, stderr, err := cluster.Fleetctl(m0, "start", "--no-block", uName0tmp) + if err != nil { + t.Fatalf("Failed starting unit %s: \nstdout: %s\nstderr: %s\nerr: %v", + uName0tmp, stdout, stderr, err) + } + + stdout, _, err = cluster.Fleetctl(m0, "list-unit-files", "--no-legend") + if err != nil { + t.Fatalf("Failed to run list-unit-files: %v", err) + } + units = strings.Split(strings.TrimSpace(stdout), "\n") + if len(units) != nUnits { + t.Fatalf("Did not find two units in cluster: \n%s", stdout) + } + _, err = cluster.WaitForNActiveUnits(m0, nUnits) + if err != nil { + t.Fatal(err) + } + ufs, err := cluster.WaitForNUnitFiles(m0, nUnits) + if err != nil { + t.Fatalf("Failed to run list-unit-files: %v", err) + } + + uStates := make([][]util.UnitFileState, nUnits) + var found bool + for i, unb := range uNameBase { + uStates[i], found = ufs[unb] + if len(ufs) != nUnits || !found { + t.Fatalf("Did not find %d launched unit as expected: got %d\n", nUnits, len(ufs)) + } + } + nLaunched := 0 + for _, us := range uStates { + for _, state := range us { + if strings.Contains(state.State, "launched") { + nLaunched += 1 + } + } + } + if nLaunched != 1 { + t.Fatalf("Did not find 1 launched unit as expected: got %d", nLaunched) + } + + os.Remove(uName0tmp) +} + // Ensure units can be scheduled directly to a given machine using the // MachineID unit option. func TestScheduleConditionMachineID(t *testing.T) { diff --git a/job/job.go b/job/job.go index f3adf4707..fc5723dc4 100644 --- a/job/job.go +++ b/job/job.go @@ -28,6 +28,8 @@ const ( JobStateInactive = JobState("inactive") JobStateLoaded = JobState("loaded") JobStateLaunched = JobState("launched") + + JobReschedule = "jobreschedule" ) // fleet-specific unit file requirement keys. @@ -42,6 +44,8 @@ const ( fleetMachineOf = "MachineOf" // Prevent a unit from being collocated with other units using glob-matching on the other unit names. fleetConflicts = "Conflicts" + // Reschedule a unit to another machine + fleetReplaces = "Replaces" // Machine metadata key in the unit file fleetMachineMetadata = "MachineMetadata" // Require that the unit be scheduled on every machine in the cluster @@ -63,6 +67,7 @@ var validRequirements = pkg.NewUnsafeSet( deprecatedXConditionPrefix+fleetMachineMetadata, fleetMachineMetadata, fleetGlobal, + fleetReplaces, ) func ParseJobState(s string) (JobState, error) { @@ -138,6 +143,14 @@ func (u *Unit) Conflicts() []string { return j.Conflicts() } +func (u *Unit) Replaces() []string { + j := &Job{ + Name: u.Name, + Unit: u.Unit, + } + return j.Replaces() +} + func (u *Unit) Peers() []string { j := &Job{ Name: u.Name, @@ -209,6 +222,14 @@ func (j *Job) Conflicts() []string { return conflicts } +// Replaces returns a list of Job names that should be scheduled to the another +// machine as this Job. +func (j *Job) Replaces() []string { + replaces := make([]string, 0) + replaces = append(replaces, j.requirements()[fleetReplaces]...) + return replaces +} + // Peers returns a list of Job names that must be scheduled to the same // machine as this Job. func (j *Job) Peers() []string { diff --git a/job/job_test.go b/job/job_test.go index df17b1bf6..7381dc63c 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -88,6 +88,29 @@ Conflicts=*bar* } } +func TestJobReplaces(t *testing.T) { + testCases := []struct { + contents string + replaces []string + }{ + {``, []string{}}, + {`[Unit] +Description=Testing + +[X-Fleet] +Replaces=*bar* +`, []string{"*bar*"}}, + } + for i, tt := range testCases { + j := NewJob("echo.service", *newUnit(t, tt.contents)) + replaces := j.Replaces() + if !reflect.DeepEqual(replaces, tt.replaces) { + t.Errorf("case %d: unexpected replaces: got %#v, want %#v", i, replaces, tt.replaces) + } + + } +} + func TestParseRequirements(t *testing.T) { testCases := []struct { contents string @@ -567,6 +590,7 @@ func TestValidateRequirements(t *testing.T) { "X-ConditionMachineMetadata=up=down", "MachineMetadata=true=false", "Global=true", + "Replaces=foo", } for i, req := range tests { contents := fmt.Sprintf("[X-Fleet]\n%s", req)