Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable real chunks job #412

Merged
merged 5 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/v1/ytsaurus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ const (
UpdateStateWaitingForPodsRemoval UpdateState = "WaitingForPodsRemoval"
UpdateStateWaitingForPodsCreation UpdateState = "WaitingForPodsCreation"
UpdateStateWaitingForMasterExitReadOnly UpdateState = "WaitingForMasterExitReadOnly"
UpdateStateWaitingForEnableRealChunkLocations UpdateState = "WaitingForEnableRealChunkLocations"
UpdateStateWaitingForTabletCellsRecovery UpdateState = "WaitingForTabletCellsRecovery"
UpdateStateWaitingForOpArchiveUpdatingPrepare UpdateState = "WaitingForOpArchiveUpdatingPrepare"
UpdateStateWaitingForOpArchiveUpdate UpdateState = "WaitingForOpArchiveUpdate"
Expand Down
9 changes: 9 additions & 0 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ func (r *YtsaurusReconciler) handleEverything(
case ytv1.UpdateStateWaitingForTabletCellsRemoved:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionTabletCellsRemoved) {
ytsaurus.LogUpdate(ctx, "Waiting for snapshots")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForEnableRealChunkLocations)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForEnableRealChunkLocations:
// This stage may also be added to MasterOnly flow, but it makes sense only if
// data nodes are re-registered in master after this job, so I've added it only here.
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionRealChunkLocationsEnabled) {
ytsaurus.LogUpdate(ctx, "Real chunk locations enabled")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForSnapshots)
return &ctrl.Result{Requeue: true}, err
}
Expand Down
147 changes: 123 additions & 24 deletions pkg/components/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ type Master struct {
localServerComponent
cfgen *ytconfig.Generator

initJob *InitJob
exitReadOnlyJob *InitJob
adminCredentials corev1.Secret
initJob *InitJob
enableRealChunksJob *InitJob
exitReadOnlyJob *InitJob
adminCredentials corev1.Secret
}

func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Master {
Expand All @@ -54,18 +55,36 @@ func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Master {
}),
)

jobImage := getImageWithDefault(resource.Spec.PrimaryMasters.InstanceSpec.Image, resource.Spec.CoreImage)
jobTolerations := getTolerationsWithDefault(resource.Spec.PrimaryMasters.Tolerations, resource.Spec.Tolerations)
jobNodeSelector := getNodeSelectorWithDefault(resource.Spec.PrimaryMasters.NodeSelector, resource.Spec.NodeSelector)
dnsConfig := getDNSConfigWithDefault(resource.Spec.PrimaryMasters.DNSConfig, resource.Spec.DNSConfig)
initJob := NewInitJob(
l,
ytsaurus.APIProxy(),
ytsaurus,
resource.Spec.ImagePullSecrets,
"default",
consts.ClientConfigFileName,
getImageWithDefault(resource.Spec.PrimaryMasters.InstanceSpec.Image, resource.Spec.CoreImage),
jobImage,
cfgen.GetNativeClientConfig,
getTolerationsWithDefault(resource.Spec.PrimaryMasters.Tolerations, resource.Spec.Tolerations),
getNodeSelectorWithDefault(resource.Spec.PrimaryMasters.NodeSelector, resource.Spec.NodeSelector),
getDNSConfigWithDefault(resource.Spec.PrimaryMasters.DNSConfig, resource.Spec.DNSConfig),
jobTolerations,
jobNodeSelector,
dnsConfig,
)

enableRealChunksJob := NewInitJob(
l,
ytsaurus.APIProxy(),
ytsaurus,
resource.Spec.ImagePullSecrets,
"enableRealChunks",
consts.ClientConfigFileName,
jobImage,
cfgen.GetNativeClientConfig,
jobTolerations,
jobNodeSelector,
dnsConfig,
)

exitReadOnlyJob := NewInitJob(
Expand All @@ -75,17 +94,18 @@ func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Master {
resource.Spec.ImagePullSecrets,
"exit-read-only",
consts.ClientConfigFileName,
getImageWithDefault(resource.Spec.PrimaryMasters.InstanceSpec.Image, resource.Spec.CoreImage),
jobImage,
cfgen.GetNativeClientConfig,
getTolerationsWithDefault(resource.Spec.PrimaryMasters.Tolerations, resource.Spec.Tolerations),
getNodeSelectorWithDefault(resource.Spec.PrimaryMasters.NodeSelector, resource.Spec.NodeSelector),
getDNSConfigWithDefault(resource.Spec.PrimaryMasters.DNSConfig, resource.Spec.DNSConfig),
jobTolerations,
jobNodeSelector,
dnsConfig,
)

return &Master{
localServerComponent: newLocalServerComponent(l, ytsaurus, srv),
cfgen: cfgen,
initJob: initJob,
enableRealChunksJob: enableRealChunksJob,
exitReadOnlyJob: exitReadOnlyJob,
}
}
Expand All @@ -108,6 +128,7 @@ func (m *Master) Fetch(ctx context.Context) error {
return resources.Fetch(ctx,
m.server,
m.initJob,
m.enableRealChunksJob,
m.exitReadOnlyJob,
)
}
Expand Down Expand Up @@ -271,6 +292,17 @@ func (m *Master) createInitScript() string {
return strings.Join(script, "\n")
}

func (m *Master) createEnableRealChunksScript() string {
script := []string{
initJobWithNativeDriverPrologue(),
"export YT_LOG_LEVEL=DEBUG",
`[[ "$YTSAURUS_VERSION" < "23.2" || "$YTSAURUS_VERSION" > "24.1" ]] && echo "enable_real_chunk_locations is supported between versions 23.2 and 24.1, nothing to do" && exit 0`,
"/usr/bin/yt set //sys/@config/node_tracker/enable_real_chunk_locations %true",
}

return strings.Join(script, "\n")
}

func (m *Master) createExitReadOnlyScript() string {
script := []string{
initJobWithNativeDriverPrologue(),
Expand All @@ -292,8 +324,10 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error)

if m.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if m.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForMasterExitReadOnly {
st, err := m.exitReadOnly(ctx, dry)
return *st, err
return m.exitReadOnly(ctx, dry)
}
if m.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForEnableRealChunkLocations {
return m.restartEnableRealChunksJob(ctx, dry)
}
if status, err := handleUpdatingClusterState(ctx, m.ytsaurus, m, &m.localComponent, m.server, dry); status != nil {
return *status, err
Expand All @@ -311,11 +345,7 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error)
return WaitingStatus(SyncStatusBlocked, "pods"), err
}

if !dry {
m.initJob.SetInitScript(m.createInitScript())
}

return m.initJob.Sync(ctx, dry)
return m.runInitPhaseJobs(ctx, dry)
}

func (m *Master) Status(ctx context.Context) (ComponentStatus, error) {
Expand Down Expand Up @@ -350,26 +380,25 @@ func (m *Master) getHostAddressLabel() string {
return defaultHostAddressLabel
}

func (m *Master) exitReadOnly(ctx context.Context, dry bool) (*ComponentStatus, error) {
func (m *Master) exitReadOnly(ctx context.Context, dry bool) (ComponentStatus, error) {
if !m.ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionMasterExitReadOnlyPrepared) {
if !m.exitReadOnlyJob.isRestartPrepared() {
if err := m.exitReadOnlyJob.prepareRestart(ctx, dry); err != nil {
return ptr.To(SimpleStatus(SyncStatusUpdating)), err
return SimpleStatus(SyncStatusUpdating), err
}
}

if !dry {
m.setMasterReadOnlyExitPrepared(ctx, metav1.ConditionTrue)
}
return ptr.To(SimpleStatus(SyncStatusUpdating)), nil
return SimpleStatus(SyncStatusUpdating), nil
}

if !m.exitReadOnlyJob.IsCompleted() {
if !dry {
m.exitReadOnlyJob.SetInitScript(m.createExitReadOnlyScript())
}
status, err := m.exitReadOnlyJob.Sync(ctx, dry)
return &status, err
return m.exitReadOnlyJob.Sync(ctx, dry)
}

if !dry {
Expand All @@ -381,7 +410,7 @@ func (m *Master) exitReadOnly(ctx context.Context, dry bool) (*ComponentStatus,
})
m.setMasterReadOnlyExitPrepared(ctx, metav1.ConditionFalse)
}
return ptr.To(SimpleStatus(SyncStatusUpdating)), nil
return SimpleStatus(SyncStatusUpdating), nil
}

func (m *Master) setMasterReadOnlyExitPrepared(ctx context.Context, status metav1.ConditionStatus) {
Expand All @@ -392,3 +421,73 @@ func (m *Master) setMasterReadOnlyExitPrepared(ctx context.Context, status metav
Message: "Masters are ready to exit read-only state",
})
}

func (m *Master) runInitPhaseJobs(ctx context.Context, dry bool) (ComponentStatus, error) {
st, err := m.runMasterInitJob(ctx, dry)
if err != nil {
return ComponentStatus{}, err
}
if st.SyncStatus != SyncStatusReady {
return st, nil
}
st, err = m.runEnableRealChunksJob(ctx, dry)
if err != nil {
return ComponentStatus{}, err
}
return st, nil
}

// runMasterInitJob launches job only once in an Initialization phase.
func (m *Master) runMasterInitJob(ctx context.Context, dry bool) (ComponentStatus, error) {
if !dry {
m.initJob.SetInitScript(m.createInitScript())
}
return m.initJob.Sync(ctx, dry)
}

// runEnableRealChunksJob launches job in the Initialization and Updating phases.
func (m *Master) runEnableRealChunksJob(ctx context.Context, dry bool) (ComponentStatus, error) {
if !dry {
m.enableRealChunksJob.SetInitScript(m.createEnableRealChunksScript())
}
return m.enableRealChunksJob.Sync(ctx, dry)
}

func (m *Master) restartEnableRealChunksJob(ctx context.Context, dry bool) (ComponentStatus, error) {
if !m.ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionRealChunkLocationsEnablePrepared) {
if !dry {
if !m.enableRealChunksJob.isRestartPrepared() {
if err := m.enableRealChunksJob.prepareRestart(ctx, dry); err != nil {
return ComponentStatus{}, err
}
}
m.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: consts.ConditionRealChunkLocationsEnablePrepared,
Status: metav1.ConditionTrue,
Reason: "RealChunkLocationsEnablePrepared",
Message: "Enable real chunk locations job prepared to restart",
})
}
return WaitingStatus(SyncStatusPending, "reconciliation"), nil
}

if !m.enableRealChunksJob.IsCompleted() {
return m.runEnableRealChunksJob(ctx, dry)
}

if !dry {
m.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: consts.ConditionRealChunkLocationsEnabled,
Status: metav1.ConditionTrue,
Reason: "RealChunksLocationsEnabled",
Message: "Enable real-chunk locations job is finished",
})
m.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: consts.ConditionRealChunkLocationsEnablePrepared,
Status: metav1.ConditionFalse,
Reason: "RealChunkLocationsEnablePrepared",
Message: "Enable real chunk locations job preparation reset after completion",
})
}
return WaitingStatus(SyncStatusPending, "reconciliation"), nil
}
2 changes: 2 additions & 0 deletions pkg/consts/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ const ConditionYqlaUpdated = "YqlaUpdated"
const ConditionYqlaPreparedForUpdating = "YqlaPreparedForUpdating"
const ConditionMasterExitReadOnlyPrepared = "MasterExitReadOnlyPrepared"
const ConditionMasterExitedReadOnly = "MasterExitedReadOnly"
const ConditionRealChunkLocationsEnablePrepared = "RealChunkLocationsEnablePrepared"
const ConditionRealChunkLocationsEnabled = "RealChunkLocationsEnabled"
const ConditionSafeModeDisabled = "SafeModeDisabled"
1 change: 1 addition & 0 deletions test/e2e/checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers_test
func getInitializingStageJobNames() []string {
return []string{
"yt-master-init-job-default",
"yt-master-init-job-enablerealchunks",
"yt-client-init-job-user",
"yt-scheduler-init-job-user",
"yt-scheduler-init-job-op-archive",
Expand Down
19 changes: 19 additions & 0 deletions test/e2e/ytsaurus_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func()
By("Waiting cluster update completes")
EventuallyYtsaurus(ctx, ytsaurus, upgradeTimeout).Should(HaveClusterStateRunning())
checkClusterBaseViability(ytClient)
checkChunkLocations(ytClient)

podsAfterFullUpdate := getComponentPods(ctx, namespace)
pods := getChangedPods(podsBeforeUpdate, podsAfterFullUpdate)
Expand Down Expand Up @@ -1116,6 +1117,24 @@ func checkPodLabels(ctx context.Context, namespace string) {
checkPodLabelCount(pods, "app.kubernetes.io/name", "yt-scheduler", 1)
}

func checkChunkLocations(ytClient yt.Client) {
// https://github.com/ytsaurus/ytsaurus-k8s-operator/issues/396
// we expect enable_real_chunk_locations being set to true for all currently tested/supported versions.
realChunkLocationPath := "//sys/@config/node_tracker/enable_real_chunk_locations"
var realChunkLocationsValue bool
Expect(ytClient.GetNode(ctx, ypath.Path(realChunkLocationPath), &realChunkLocationsValue, nil)).Should(Succeed())
Expect(realChunkLocationsValue).Should(BeTrue())

var values []yson.ValueWithAttrs
Expect(ytClient.ListNode(ctx, ypath.Path("//sys/data_nodes"), &values, &yt.ListNodeOptions{
Attributes: []string{"use_imaginary_chunk_locations"},
})).Should(Succeed())

for _, node := range values {
Expect(node.Attrs["use_imaginary_chunk_locations"]).ShouldNot(BeTrue())
}
}

func createYtsaurusClient(ytsaurus *ytv1.Ytsaurus, namespace string) yt.Client {
By("Creating ytsaurus client")
g := ytconfig.NewGenerator(ytsaurus, "local")
Expand Down
Loading