Skip to content

Commit

Permalink
Restart Enable chunk locations job
Browse files Browse the repository at this point in the history
  • Loading branch information
l0kix2 committed Jan 9, 2025
1 parent 8e15756 commit d7bf1b5
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 18 deletions.
1 change: 1 addition & 0 deletions api/v1/ytsaurus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ const (
UpdateStateWaitingForPodsRemoval UpdateState = "WaitingForPodsRemoval"
UpdateStateWaitingForPodsCreation UpdateState = "WaitingForPodsCreation"
UpdateStateWaitingForMasterExitReadOnly UpdateState = "WaitingForMasterExitReadOnly"
UpdateStateWaitingForEnableRealChunkLocations UpdateState = "WaitingForEnableRealChunkLocations"
UpdateStateWaitingForTabletCellsRecovery UpdateState = "WaitingForTabletCellsRecovery"
UpdateStateWaitingForOpArchiveUpdatingPrepare UpdateState = "WaitingForOpArchiveUpdatingPrepare"
UpdateStateWaitingForOpArchiveUpdate UpdateState = "WaitingForOpArchiveUpdate"
Expand Down
9 changes: 9 additions & 0 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ func (r *YtsaurusReconciler) handleEverything(
case ytv1.UpdateStateWaitingForTabletCellsRemoved:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionTabletCellsRemoved) {
ytsaurus.LogUpdate(ctx, "Waiting for snapshots")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForEnableRealChunkLocations)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForEnableRealChunkLocations:
// This stage may also be added to MasterOnly flow, but it makes sense only if
// data nodes are re-registered in master after this job, so I've added it only here.
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionRealChunkLocationsEnabled) {
ytsaurus.LogUpdate(ctx, "Real chunk locations enabled")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForSnapshots)
return &ctrl.Result{Requeue: true}, err
}
Expand Down
15 changes: 8 additions & 7 deletions controllers/ytsaurus_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ func prepareTest(t *testing.T, namespace string) *testutil.TestHelper {
return h
}

func waitClusterState(h *testutil.TestHelper, expectedState ytv1.ClusterState) {
func waitClusterState(h *testutil.TestHelper, expectedState ytv1.ClusterState, expectedObservedGeneration int64) {
h.Logf("[ Wait for YTsaurus %s state ]", expectedState)
testutil.FetchAndCheckEventually(
h,
ytsaurusName,
&ytv1.Ytsaurus{},
fmt.Sprintf("cluster state is %s", expectedState),
fmt.Sprintf("cluster state is %s, gen is %d", expectedState, expectedObservedGeneration),
func(obj client.Object) bool {
state := obj.(*ytv1.Ytsaurus).Status.State
return state == expectedState
observedGen := obj.(*ytv1.Ytsaurus).Status.ObservedGeneration
return state == expectedState && observedGen == expectedObservedGeneration
},
)
}
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestYtsaurusFromScratch(t *testing.T) {
return len(secret.Data["YT_TOKEN"]) != 0
},
)
waitClusterState(h, ytv1.ClusterStateRunning)
waitClusterState(h, ytv1.ClusterStateRunning, ytsaurusResource.Generation)
}

func TestYtsaurusUpdateStatelessComponent(t *testing.T) {
Expand All @@ -123,15 +124,15 @@ func TestYtsaurusUpdateStatelessComponent(t *testing.T) {
ytsaurusResource.Spec.DataNodes[0].MinReadyInstanceCount = ptr.To(0)
testutil.DeployObject(h, &ytsaurusResource)

waitClusterState(h, ytv1.ClusterStateRunning)
waitClusterState(h, ytv1.ClusterStateRunning, ytsaurusResource.Generation)

imageUpdated := testYtsaurusImage + "-updated"
ytsaurusResource.Spec.Discovery.Image = &imageUpdated
t.Log("[ Updating discovery with disabled full update ]")
ytsaurusResource.Spec.EnableFullUpdate = false
testutil.UpdateObject(h, &ytv1.Ytsaurus{}, &ytsaurusResource)

waitClusterState(h, ytv1.ClusterStateRunning)
waitClusterState(h, ytv1.ClusterStateRunning, ytsaurusResource.Generation)

sts := appsv1.StatefulSet{}
testutil.GetObject(h, "ds", &sts)
Expand All @@ -151,7 +152,7 @@ func TestYtsaurusUpdateMasterBlocked(t *testing.T) {
ytsaurusResource.Spec.DataNodes[0].MinReadyInstanceCount = ptr.To(0)
testutil.DeployObject(h, &ytsaurusResource)

waitClusterState(h, ytv1.ClusterStateRunning)
waitClusterState(h, ytv1.ClusterStateRunning, ytsaurusResource.Generation)

imageUpdated := testYtsaurusImage + "-updated"
ytsaurusResource.Spec.PrimaryMasters.Image = &imageUpdated
Expand Down
63 changes: 52 additions & 11 deletions pkg/components/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,14 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error)

if m.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if m.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForMasterExitReadOnly {
st, err := m.exitReadOnly(ctx, dry)
return *st, err
return m.exitReadOnly(ctx, dry)
}
if status, err := handleUpdatingClusterState(ctx, m.ytsaurus, m, &m.localComponent, m.server, dry); status != nil {
return *status, err
}
if m.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForEnableRealChunkLocations {
return m.restartEnableRealChunksJob(ctx, dry)
}
}

if m.NeedSync() {
Expand All @@ -339,7 +341,7 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error)
return WaitingStatus(SyncStatusBlocked, "pods"), err
}

return m.runPostSyncJobs(ctx, dry)
return m.runInitPhaseJobs(ctx, dry)
}

func (m *Master) Status(ctx context.Context) (ComponentStatus, error) {
Expand Down Expand Up @@ -374,26 +376,25 @@ func (m *Master) getHostAddressLabel() string {
return defaultHostAddressLabel
}

func (m *Master) exitReadOnly(ctx context.Context, dry bool) (*ComponentStatus, error) {
func (m *Master) exitReadOnly(ctx context.Context, dry bool) (ComponentStatus, error) {
if !m.ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionMasterExitReadOnlyPrepared) {
if !m.exitReadOnlyJob.isRestartPrepared() {
if err := m.exitReadOnlyJob.prepareRestart(ctx, dry); err != nil {
return ptr.To(SimpleStatus(SyncStatusUpdating)), err
return SimpleStatus(SyncStatusUpdating), err
}
}

if !dry {
m.setMasterReadOnlyExitPrepared(ctx, metav1.ConditionTrue)
}
return ptr.To(SimpleStatus(SyncStatusUpdating)), nil
return SimpleStatus(SyncStatusUpdating), nil
}

if !m.exitReadOnlyJob.IsCompleted() {
if !dry {
m.exitReadOnlyJob.SetInitScript(m.createExitReadOnlyScript())
}
status, err := m.exitReadOnlyJob.Sync(ctx, dry)
return &status, err
return m.exitReadOnlyJob.Sync(ctx, dry)
}

if !dry {
Expand All @@ -405,7 +406,7 @@ func (m *Master) exitReadOnly(ctx context.Context, dry bool) (*ComponentStatus,
})
m.setMasterReadOnlyExitPrepared(ctx, metav1.ConditionFalse)
}
return ptr.To(SimpleStatus(SyncStatusUpdating)), nil
return SimpleStatus(SyncStatusUpdating), nil
}

func (m *Master) setMasterReadOnlyExitPrepared(ctx context.Context, status metav1.ConditionStatus) {
Expand All @@ -417,7 +418,7 @@ func (m *Master) setMasterReadOnlyExitPrepared(ctx context.Context, status metav
})
}

func (m *Master) runPostSyncJobs(ctx context.Context, dry bool) (ComponentStatus, error) {
func (m *Master) runInitPhaseJobs(ctx context.Context, dry bool) (ComponentStatus, error) {
st, err := m.runMasterInitJob(ctx, dry)
if err != nil {
return ComponentStatus{}, err
Expand All @@ -432,17 +433,57 @@ func (m *Master) runPostSyncJobs(ctx context.Context, dry bool) (ComponentStatus
return st, nil
}

// runMasterInitJob launches job only once in an Initialization phase.
func (m *Master) runMasterInitJob(ctx context.Context, dry bool) (ComponentStatus, error) {
if !dry {
m.initJob.SetInitScript(m.createInitScript())
}
return m.initJob.Sync(ctx, dry)
}

// runEnableRealChunksJob launches job in the Initialization and Updating phases.
func (m *Master) runEnableRealChunksJob(ctx context.Context, dry bool) (ComponentStatus, error) {
// TODO: prepare restart, etc
if !dry {
m.enableRealChunksJob.SetInitScript(m.createEnableRealChunksScript())
}
return m.enableRealChunksJob.Sync(ctx, dry)
}

func (m *Master) restartEnableRealChunksJob(ctx context.Context, dry bool) (ComponentStatus, error) {
if !m.ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionRealChunkLocationsEnablePrepared) {
if !dry {
if !m.enableRealChunksJob.isRestartPrepared() {
if err := m.enableRealChunksJob.prepareRestart(ctx, dry); err != nil {
return ComponentStatus{}, err
}
}
m.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: consts.ConditionRealChunkLocationsEnablePrepared,
Status: metav1.ConditionTrue,
Reason: "RealChunkLocationsEnablePrepared",
Message: "Enable real chunk locations job prepared to restart",
})
}
return WaitingStatus(SyncStatusPending, "reconciliation"), nil
}

if !m.enableRealChunksJob.IsCompleted() {
return m.runEnableRealChunksJob(ctx, dry)
}

if !dry {
m.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: consts.ConditionRealChunkLocationsEnabled,
Status: metav1.ConditionTrue,
Reason: "RealChunksLocationsEnabled",
Message: "Enable real-chunk locations job is finished",
})
m.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: consts.ConditionRealChunkLocationsEnablePrepared,
Status: metav1.ConditionFalse,
Reason: "RealChunkLocationsEnablePrepared",
Message: "Enable real chunk locations job preparation reset after completion",
})
}
return WaitingStatus(SyncStatusPending, "reconciliation"), nil
}
2 changes: 2 additions & 0 deletions pkg/consts/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ const ConditionYqlaUpdated = "YqlaUpdated"
const ConditionYqlaPreparedForUpdating = "YqlaPreparedForUpdating"
const ConditionMasterExitReadOnlyPrepared = "MasterExitReadOnlyPrepared"
const ConditionMasterExitedReadOnly = "MasterExitedReadOnly"
const ConditionRealChunkLocationsEnablePrepared = "RealChunkLocationsEnablePrepared"
const ConditionRealChunkLocationsEnabled = "RealChunkLocationsEnabled"
const ConditionSafeModeDisabled = "SafeModeDisabled"

0 comments on commit d7bf1b5

Please sign in to comment.