Skip to content

Commit

Permalink
Add yqla update job
Browse files Browse the repository at this point in the history
  • Loading branch information
Marina Pereskokova committed Nov 6, 2024
1 parent 5d45249 commit cb134ad
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 13 deletions.
2 changes: 2 additions & 0 deletions api/v1/ytsaurus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,8 @@ const (
UpdateStateWaitingForOpArchiveUpdate UpdateState = "WaitingForOpArchiveUpdate"
UpdateStateWaitingForQTStateUpdatingPrepare UpdateState = "WaitingForQTStateUpdatingPrepare"
UpdateStateWaitingForQTStateUpdate UpdateState = "WaitingForQTStateUpdate"
UpdateStateWaitingForYqlAUpdatingPrepare UpdateState = "WaitingForYqlAUpdatingPrepare"
UpdateStateWaitingForYqlAUpdate UpdateState = "WaitingForYqlAUpdate"
UpdateStateWaitingForSafeModeDisabled UpdateState = "WaitingForSafeModeDisabled"
)

Expand Down
9 changes: 8 additions & 1 deletion controllers/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ComponentManager struct {
ytsaurus *apiProxy.Ytsaurus
allComponents []components.Component
queryTrackerComponent components.Component
yqlAgentComponent components.Component
schedulerComponent components.Component
status ComponentManagerStatus
}
Expand Down Expand Up @@ -121,8 +122,9 @@ func NewComponentManager(
allComponents = append(allComponents, qa)
}

var yqla components.Component
if resource.Spec.YQLAgents != nil {
yqla := components.NewYQLAgent(cfgen, ytsaurus, m)
yqla = components.NewYQLAgent(cfgen, ytsaurus, m)
allComponents = append(allComponents, yqla)
}

Expand Down Expand Up @@ -191,6 +193,7 @@ func NewComponentManager(
ytsaurus: ytsaurus,
allComponents: allComponents,
queryTrackerComponent: q,
yqlAgentComponent: yqla,
schedulerComponent: s,
status: status,
}, nil
Expand Down Expand Up @@ -250,6 +253,10 @@ func (cm *ComponentManager) needQueryTrackerUpdate() bool {
return cm.queryTrackerComponent != nil && components.IsUpdatingComponent(cm.ytsaurus, cm.queryTrackerComponent)
}

func (cm *ComponentManager) needYqlAgentUpdate() bool {
return cm.yqlAgentComponent != nil && components.IsUpdatingComponent(cm.ytsaurus, cm.yqlAgentComponent)
}

func (cm *ComponentManager) needSchedulerUpdate() bool {
return cm.schedulerComponent != nil && components.IsUpdatingComponent(cm.ytsaurus, cm.schedulerComponent)
}
Expand Down
48 changes: 44 additions & 4 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func (r *YtsaurusReconciler) handleEverything(
case ytv1.UpdateStateWaitingForQTStateUpdatingPrepare:
if !componentManager.needQueryTrackerUpdate() {
ytsaurus.LogUpdate(ctx, "Query tracker state update was skipped")
ytsaurus.LogUpdate(ctx, "Waiting for safe mode disabled")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForSafeModeDisabled)
ytsaurus.LogUpdate(ctx, "Waiting for yql agent env prepare for updating")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForYqlAUpdatingPrepare)
return &ctrl.Result{Requeue: true}, err
}
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionQTStatePreparedForUpdating) {
Expand All @@ -146,6 +146,26 @@ func (r *YtsaurusReconciler) handleEverything(

case ytv1.UpdateStateWaitingForQTStateUpdate:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionQTStateUpdated) {
ytsaurus.LogUpdate(ctx, "Waiting for yql agent env prepare for updating")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForYqlAUpdatingPrepare)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForYqlAUpdatingPrepare:
if !componentManager.needYqlAgentUpdate() {
ytsaurus.LogUpdate(ctx, "Yql agent env update was skipped")
ytsaurus.LogUpdate(ctx, "Waiting for safe mode disabled")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForSafeModeDisabled)
return &ctrl.Result{Requeue: true}, err
}
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionYqlAPreparedForUpdating) {
ytsaurus.LogUpdate(ctx, "Waiting for yql agent env updating to finish")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForYqlAUpdate)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForYqlAUpdate:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionYqlAUpdated) {
ytsaurus.LogUpdate(ctx, "Waiting for safe mode disabled")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForSafeModeDisabled)
return &ctrl.Result{Requeue: true}, err
Expand Down Expand Up @@ -213,8 +233,8 @@ func (r *YtsaurusReconciler) handleStateless(
case ytv1.UpdateStateWaitingForQTStateUpdatingPrepare:
if !componentManager.needQueryTrackerUpdate() {
ytsaurus.LogUpdate(ctx, "Query tracker state update was skipped")
ytsaurus.LogUpdate(ctx, "Finishing")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateUpdateFinishing)
ytsaurus.LogUpdate(ctx, "Waiting for yql agent env prepare for updating")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForYqlAUpdatingPrepare)
return &ctrl.Result{Requeue: true}, err
}
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionQTStatePreparedForUpdating) {
Expand All @@ -225,6 +245,26 @@ func (r *YtsaurusReconciler) handleStateless(

case ytv1.UpdateStateWaitingForQTStateUpdate:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionQTStateUpdated) {
ytsaurus.LogUpdate(ctx, "Waiting for yql agent env prepare for updating")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForYqlAUpdatingPrepare)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForYqlAUpdatingPrepare:
if !componentManager.needYqlAgentUpdate() {
ytsaurus.LogUpdate(ctx, "Yql agent env update was skipped")
ytsaurus.LogUpdate(ctx, "Finishing")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateUpdateFinishing)
return &ctrl.Result{Requeue: true}, err
}
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionYqlAPreparedForUpdating) {
ytsaurus.LogUpdate(ctx, "Waiting for yql agent env updating to finish")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForYqlAUpdate)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForYqlAUpdate:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionYqlAUpdated) {
ytsaurus.LogUpdate(ctx, "Finishing")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateUpdateFinishing)
return &ctrl.Result{Requeue: true}, err
Expand Down
105 changes: 97 additions & 8 deletions pkg/components/yql_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/apiproxy"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts"
Expand All @@ -20,10 +21,11 @@ import (

type YqlAgent struct {
localServerComponent
cfgen *ytconfig.Generator
master Component
initEnvironment *InitJob
secret *resources.StringSecret
cfgen *ytconfig.Generator
master Component
initEnvironment *InitJob
updateEnvironment *InitJob
secret *resources.StringSecret
}

func NewYQLAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *YqlAgent {
Expand Down Expand Up @@ -73,6 +75,18 @@ func NewYQLAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master
getTolerationsWithDefault(resource.Spec.YQLAgents.Tolerations, resource.Spec.Tolerations),
getNodeSelectorWithDefault(resource.Spec.YQLAgents.NodeSelector, resource.Spec.NodeSelector),
),
updateEnvironment: NewInitJob(
&l,
ytsaurus.APIProxy(),
ytsaurus,
resource.Spec.ImagePullSecrets,
"yql-agent-update-environment",
consts.ClientConfigFileName,
getImageWithDefault(resource.Spec.YQLAgents.Image, resource.Spec.CoreImage),
cfgen.GetNativeClientConfig,
getTolerationsWithDefault(resource.Spec.YQLAgents.Tolerations, resource.Spec.Tolerations),
getNodeSelectorWithDefault(resource.Spec.YQLAgents.NodeSelector, resource.Spec.NodeSelector),
),
secret: resources.NewStringSecret(
l.GetSecretName(),
&l,
Expand All @@ -94,6 +108,7 @@ func (yqla *YqlAgent) Fetch(ctx context.Context) error {
return resources.Fetch(ctx,
yqla.server,
yqla.initEnvironment,
yqla.updateEnvironment,
yqla.secret,
)
}
Expand All @@ -120,13 +135,22 @@ func (yqla *YqlAgent) createInitScript() string {
yqla.initUsers(),
"/usr/bin/yt add-member --member yql_agent --group superusers || true",
"/usr/bin/yt create document //sys/yql_agent/config --attributes '{value={}}' --recursive --ignore-existing",
fmt.Sprintf("/usr/bin/yt set //sys/@cluster_connection/yql_agent '{stages={production={channel={addresses=%v}}}}'", yqlAgentAddrs),
fmt.Sprintf("/usr/bin/yt set //sys/@cluster_connection/yql_agent '{stages={production={channel={disable_balancing_on_single_address=%%false;addresses=%v}}}}'", yqlAgentAddrs),
fmt.Sprintf("/usr/bin/yt get //sys/@cluster_connection | /usr/bin/yt set //sys/clusters/%s", yqla.labeller.GetClusterName()),
}

return strings.Join(script, "\n")
}

func (yqla *YqlAgent) createUpdateScript() string {
script := []string{
initJobWithNativeDriverPrologue(),
fmt.Sprintf("/usr/bin/yt set //sys/@cluster_connection/yql_agent/stages/production/channel/disable_balancing_on_single_address '%%false'"),
}

return strings.Join(script, "\n")
}

func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, error) {
var err error

Expand All @@ -135,8 +159,23 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er
}

if yqla.ytsaurus.GetClusterState() == ytv1.ClusterStateUpdating {
if status, err := handleUpdatingClusterState(ctx, yqla.ytsaurus, yqla, &yqla.localComponent, yqla.server, dry); status != nil {
return *status, err
if IsUpdatingComponent(yqla.ytsaurus, yqla) {
if yqla.ytsaurus.GetUpdateState() == ytv1.UpdateStateWaitingForPodsRemoval && IsUpdatingComponent(yqla.ytsaurus, yqla) {
if !dry {
err = removePods(ctx, yqla.server, &yqla.localComponent)
}
return WaitingStatus(SyncStatusUpdating, "pods removal"), err
}

if status, err := yqla.updateYqlA(ctx, dry); status != nil {
return *status, err
}
if yqla.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForPodsCreation &&
yqla.ytsaurus.GetUpdateState() != ytv1.UpdateStateWaitingForYqlAUpdate {
return NewComponentStatus(SyncStatusReady, "Nothing to do now"), err
}
} else {
return NewComponentStatus(SyncStatusReady, "Not updating component"), err
}
}

Expand Down Expand Up @@ -185,7 +224,57 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er
yqla.initEnvironment.SetInitScript(yqla.createInitScript())
}

return yqla.initEnvironment.Sync(ctx, dry)
status, err := yqla.initEnvironment.Sync(ctx, dry)
if err != nil || status.SyncStatus != SyncStatusReady {
return status, err
}

if !dry {
yqla.updateEnvironment.SetInitScript(yqla.createUpdateScript())
}
return yqla.updateEnvironment.Sync(ctx, dry)
}

func (yqla *YqlAgent) updateYqlA(ctx context.Context, dry bool) (*ComponentStatus, error) {
var err error
switch yqla.ytsaurus.GetUpdateState() {
case ytv1.UpdateStateWaitingForYqlAUpdatingPrepare:
if !yqla.updateEnvironment.isRestartPrepared() {
return ptr.To(SimpleStatus(SyncStatusUpdating)), yqla.updateEnvironment.prepareRestart(ctx, dry)
}
if !dry {
yqla.setConditionYqlAPreparedForUpdating(ctx)
}
return ptr.To(SimpleStatus(SyncStatusUpdating)), err
case ytv1.UpdateStateWaitingForYqlAUpdate:
if !yqla.updateEnvironment.isRestartCompleted() {
return nil, nil
}
if !dry {
yqla.setConditionYqlAUpdated(ctx)
}
return ptr.To(SimpleStatus(SyncStatusUpdating)), err
default:
return nil, nil
}
}

func (yqla *YqlAgent) setConditionYqlAPreparedForUpdating(ctx context.Context) {
yqla.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: consts.ConditionYqlAPreparedForUpdating,
Status: metav1.ConditionTrue,
Reason: "YqlAPreparedForUpdating",
Message: "Yql Agent state prepared for updating",
})
}

func (yqla *YqlAgent) setConditionYqlAUpdated(ctx context.Context) {
yqla.ytsaurus.SetUpdateStatusCondition(ctx, metav1.Condition{
Type: consts.ConditionYqlAUpdated,
Status: metav1.ConditionTrue,
Reason: "YqlAUpdated",
Message: "Yql Agent state updated",
})
}

func (yqla *YqlAgent) Status(ctx context.Context) (ComponentStatus, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/consts/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const ConditionOpArchivePreparedForUpdating = "OpArchivePreparedForUpdating"
const ConditionNotNecessaryToUpdateOpArchive = "NotNecessaryToUpdateOpArchive"
const ConditionQTStateUpdated = "QTStateUpdated"
const ConditionQTStatePreparedForUpdating = "QTStatePreparedForUpdating"
const ConditionYqlAUpdated = "YqlAUpdated"
const ConditionYqlAPreparedForUpdating = "YqlAPreparedForUpdating"
const ConditionMasterExitReadOnlyPrepared = "MasterExitReadOnlyPrepared"
const ConditionMasterExitedReadOnly = "MasterExitedReadOnly"
const ConditionSafeModeDisabled = "SafeModeDisabled"
9 changes: 9 additions & 0 deletions pkg/testutil/spec_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ func WithQueryTracker(ytsaurus *ytv1.Ytsaurus) *ytv1.Ytsaurus {
return ytsaurus
}

func WithYqlAgent(ytsaurus *ytv1.Ytsaurus) *ytv1.Ytsaurus {
ytsaurus.Spec.YqlAgents = &ytv1.YqlAgentSpec{

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run checks

ytsaurus.Spec.YqlAgents undefined (type "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1".YtsaurusSpec has no field or method YqlAgents)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run checks

undefined: ytv1.YqlAgentSpec) (typecheck)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run checks

ytsaurus.Spec.YqlAgents undefined (type "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1".YtsaurusSpec has no field or method YqlAgents)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run checks

undefined: ytv1.YqlAgentSpec) (typecheck)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run checks

ytsaurus.Spec.YqlAgents undefined (type "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1".YtsaurusSpec has no field or method YqlAgents)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run checks

undefined: ytv1.YqlAgentSpec (typecheck)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run checks

ytsaurus.Spec.YqlAgents undefined (type "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1".YtsaurusSpec has no field or method YqlAgents)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run compat tests / Run checks

ytsaurus.Spec.YqlAgents undefined (type "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1".YtsaurusSpec has no field or method YqlAgents)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run compat tests / Run checks

undefined: ytv1.YqlAgentSpec

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run e2e tests for basic scenarios / Run checks

ytsaurus.Spec.YqlAgents undefined (type "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1".YtsaurusSpec has no field or method YqlAgents)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run e2e tests for basic scenarios / Run checks

undefined: ytv1.YqlAgentSpec

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run unit tests / Run checks

ytsaurus.Spec.YqlAgents undefined (type "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1".YtsaurusSpec has no field or method YqlAgents)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run unit tests / Run checks

undefined: ytv1.YqlAgentSpec

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run e2e tests for extra scenarios / Run checks

ytsaurus.Spec.YqlAgents undefined (type "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1".YtsaurusSpec has no field or method YqlAgents)

Check failure on line 212 in pkg/testutil/spec_builders.go

View workflow job for this annotation

GitHub Actions / Run e2e tests for extra scenarios / Run checks

undefined: ytv1.YqlAgentSpec
InstanceSpec: ytv1.InstanceSpec{
InstanceCount: 1,
},
}
return ytsaurus
}

func WithQueueAgent(ytsaurus *ytv1.Ytsaurus) *ytv1.Ytsaurus {
ytsaurus.Spec.QueueAgents = &ytv1.QueueAgentSpec{
InstanceSpec: ytv1.InstanceSpec{
Expand Down
27 changes: 27 additions & 0 deletions test/e2e/ytsaurus_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,33 @@ var _ = Describe("Basic e2e test for Ytsaurus controller", Label("e2e"), func()

}) // update query-tracker

Context("With yql agent", Label("yql-agent"), func() {

It("Should run with yql agent and check that yql agent channel options set up correctly", Label("basic"), func(ctx context.Context) {
By("Creating a Ytsaurus resource")

namespace := "yqlagentchannel"

ytsaurus := testutil.CreateBaseYtsaurusResource(namespace)
ytsaurus = testutil.WithYqlAgent(ytsaurus)

g := ytconfig.NewGenerator(ytsaurus, "local")

DeferCleanup(deleteYtsaurus, ytsaurus)
runYtsaurus(ytsaurus)

By("Creating ytsaurus client")
ytClient := getYtClient(g, namespace)

By("Check that yql agent channel exists in cluster_connection")
Expect(ytClient.NodeExists(ctx, ypath.Path("//sys/@cluster_connection/yql_agent/stages/production/channel"), nil)).Should(BeTrue())
result := true
Expect(ytClient.GetNode(ctx, ypath.Path("//sys/@cluster_connection/yql_agent/stages/production/channel/disable_balancing_on_single_address"), &result, nil)).Should(Succeed())
Expect(result).Should(BeFalse())
})

}) // update yql-agent

Context("With queue agent", Label("queue-agent"), func() {

It("Should run with query tracker and check that access control objects set up correctly", Label("basic"), func(ctx context.Context) {
Expand Down

0 comments on commit cb134ad

Please sign in to comment.