Skip to content
This repository has been archived by the owner on Sep 2, 2022. It is now read-only.

#420 #450

Closed
wants to merge 7 commits into from
Closed

#420 #450

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions api/v1beta1/flinkcluster_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,17 @@ func TestSetDefault(t *testing.T) {
var defaultJmBlobPort = int32(6124)
var defaultJmQueryPort = int32(6125)
var defaultJmUIPort = int32(8081)
var defaultJmIngressTLSUse = false
var defaultTmDataPort = int32(6121)
var defaultTmRPCPort = int32(6122)
var defaultTmQueryPort = int32(6125)
var defaultJobAllowNonRestoredState = false
var defaultJobParallelism = int32(1)
var defaultJobNoLoggingToStdout = false
var defaultJobRestartPolicy = JobRestartPolicyNever
var defatulJobManagerIngressTLSUse = false
var defaultMemoryOffHeapRatio = int32(25)
var defaultMemoryOffHeapMin = resource.MustParse("600M")
defaultRecreateOnUpdate := new(bool)
*defaultRecreateOnUpdate = true
var defaultRecreateOnUpdate = true
var expectedCluster = FlinkCluster{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Expand All @@ -70,7 +69,7 @@ func TestSetDefault(t *testing.T) {
Replicas: &defaultJmReplicas,
AccessScope: "Cluster",
Ingress: &JobManagerIngressSpec{
UseTLS: &defatulJobManagerIngressTLSUse,
UseTLS: &defaultJmIngressTLSUse,
},
Ports: JobManagerPorts{
RPC: &defaultJmRPCPort,
Expand Down Expand Up @@ -115,7 +114,7 @@ func TestSetDefault(t *testing.T) {
MountPath: "/etc/hadoop/conf",
},
EnvVars: nil,
RecreateOnUpdate: defaultRecreateOnUpdate,
RecreateOnUpdate: &defaultRecreateOnUpdate,
},
Status: FlinkClusterStatus{},
}
Expand All @@ -134,23 +133,22 @@ func TestSetNonDefault(t *testing.T) {
var jmBlobPort = int32(8124)
var jmQueryPort = int32(8125)
var jmUIPort = int32(9081)
var jmIngressTLSUse = true
var tmDataPort = int32(8121)
var tmRPCPort = int32(8122)
var tmQueryPort = int32(8125)
var jobAllowNonRestoredState = true
var jobParallelism = int32(2)
var jobNoLoggingToStdout = true
var jobRestartPolicy = JobRestartPolicyFromSavepointOnFailure
var jobManagerIngressTLSUse = true
var memoryOffHeapRatio = int32(50)
var memoryOffHeapMin = resource.MustParse("600M")
var recreateOnUpdate = false
var securityContextUserGroup = int64(9999)
var securityContext = corev1.PodSecurityContext{
RunAsUser: &securityContextUserGroup,
RunAsGroup: &securityContextUserGroup,
}
defaultRecreateOnUpdate := new(bool)
*defaultRecreateOnUpdate = true
var cluster = FlinkCluster{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Expand All @@ -164,7 +162,7 @@ func TestSetNonDefault(t *testing.T) {
Replicas: &jmReplicas,
AccessScope: "Cluster",
Ingress: &JobManagerIngressSpec{
UseTLS: &jobManagerIngressTLSUse,
UseTLS: &jmIngressTLSUse,
},
Ports: JobManagerPorts{
RPC: &jmRPCPort,
Expand Down Expand Up @@ -208,7 +206,8 @@ func TestSetNonDefault(t *testing.T) {
HadoopConfig: &HadoopConfig{
MountPath: "/opt/flink/hadoop/conf",
},
EnvVars: nil,
EnvVars: nil,
RecreateOnUpdate: &recreateOnUpdate,
},
Status: FlinkClusterStatus{},
}
Expand All @@ -228,7 +227,7 @@ func TestSetNonDefault(t *testing.T) {
Replicas: &jmReplicas,
AccessScope: "Cluster",
Ingress: &JobManagerIngressSpec{
UseTLS: &jobManagerIngressTLSUse,
UseTLS: &jmIngressTLSUse,
},
Ports: JobManagerPorts{
RPC: &jmRPCPort,
Expand Down Expand Up @@ -273,7 +272,7 @@ func TestSetNonDefault(t *testing.T) {
MountPath: "/opt/flink/hadoop/conf",
},
EnvVars: nil,
RecreateOnUpdate: defaultRecreateOnUpdate,
RecreateOnUpdate: &recreateOnUpdate,
},
Status: FlinkClusterStatus{},
}
Expand Down
123 changes: 71 additions & 52 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ const (

// JobState defines states for a Flink job deployment.
const (
JobStatePending = "Pending"
JobStateRunning = "Running"
JobStateUpdating = "Updating"
JobStateSucceeded = "Succeeded"
JobStateFailed = "Failed"
JobStateCancelled = "Cancelled"
JobStateSuspended = "Suspended"
JobStateUnknown = "Unknown"
JobStateLost = "Lost"
JobStatePending = "Pending"
JobStateUpdating = "Updating"
JobStateRestarting = "Restarting"
JobStateDeploying = "Deploying"
JobStateDeployFailed = "DeployFailed"
JobStateRunning = "Running"
JobStateSucceeded = "Succeeded"
JobStateCancelled = "Cancelled"
JobStateFailed = "Failed"
JobStateLost = "Lost"
JobStateUnknown = "Unknown"
)

// AccessScope defines the access scope of JobManager service.
Expand Down Expand Up @@ -85,24 +87,23 @@ const (
ControlNameJobCancel = "job-cancel"

// control state
ControlStateProgressing = "Progressing"
ControlStateSucceeded = "Succeeded"
ControlStateFailed = "Failed"
ControlStateRequested = "Requested"
ControlStateInProgress = "InProgress"
ControlStateSucceeded = "Succeeded"
ControlStateFailed = "Failed"
)

// Savepoint status
const (
SavepointStateNotTriggered = "NotTriggered"
SavepointStateInProgress = "InProgress"
SavepointStateTriggerFailed = "TriggerFailed"
SavepointStateFailed = "Failed"
SavepointStateSucceeded = "Succeeded"

SavepointTriggerReasonUserRequested = "user requested"
SavepointTriggerReasonScheduled = "scheduled"
SavepointTriggerReasonScheduledInitial = "scheduled initial" // The first triggered savepoint has slightly different flow
SavepointTriggerReasonJobCancel = "job cancel"
SavepointTriggerReasonUpdate = "update"
SavepointTriggerReasonUserRequested = "user requested"
SavepointTriggerReasonJobCancel = "job cancel"
SavepointTriggerReasonScheduled = "scheduled"
SavepointTriggerReasonUpdate = "update"
)

// ImageSpec defines Flink image of JobManager and TaskManager containers.
Expand Down Expand Up @@ -348,12 +349,20 @@ type JobSpec struct {
// Allow non-restored state, default: false.
AllowNonRestoredState *bool `json:"allowNonRestoredState,omitempty"`

// Should take savepoint before upgrading the job, default: false.
TakeSavepointOnUpgrade *bool `json:"takeSavepointOnUpgrade,omitempty"`

// Savepoints dir where to store savepoints of the job.
SavepointsDir *string `json:"savepointsDir,omitempty"`

// Should take savepoint before updating job, default: true.
// If this is set as false, maxStateAgeToRestoreSeconds must be provided to limit the savepoint age to restore.
TakeSavepointOnUpdate *bool `json:"takeSavepointOnUpdate,omitempty"`

// Maximum age of the savepoint that allowed to restore state..
// This is applied to auto restart on failure, update from stopped state and update without taking savepoint.
// If nil, job can be restarted only when the latest savepoint is the final job state (created by "stop with savepoint")
// - that is, only when job can be resumed from the suspended state.
// +kubebuilder:validation:Minimum=0
MaxStateAgeToRestoreSeconds *int32 `json:"maxStateAgeToRestoreSeconds,omitempty"`

// Automatically take a savepoint to the `savepointsDir` every n seconds.
AutoSavepointSeconds *int32 `json:"autoSavepointSeconds,omitempty"`

Expand Down Expand Up @@ -555,7 +564,7 @@ type JobStatus struct {
// The ID of the Flink job.
ID string `json:"id,omitempty"`

// The state of the Kubernetes job.
// The state of the Flink job deployment.
State string `json:"state"`

// The actual savepoint from which this job started.
Expand All @@ -571,21 +580,26 @@ type JobStatus struct {
// Savepoint location.
SavepointLocation string `json:"savepointLocation,omitempty"`

// Last savepoint trigger ID.
LastSavepointTriggerID string `json:"lastSavepointTriggerID,omitempty"`
// Last successful savepoint completed timestamp.
SavepointTime string `json:"savepointTime,omitempty"`

// Last savepoint trigger time. This is updated to make sure multiple
// savepoints will not be taken simultaneously.
LastSavepointTriggerTime string `json:"lastSavepointTriggerTime,omitempty"`
// The savepoint recorded in savepointLocation is the final state of the job.
FinalSavepoint bool `json:"finalSavepoint,omitempty"`

// Last successful or failed savepoint operation timestamp.
LastSavepointTime string `json:"lastSavepointTime,omitempty"`
// The timestamp of the Flink job deployment that creating job submitter.
DeployTime string `json:"deployTime,omitempty"`

// The Flink job started timestamp.
StartTime string `json:"startTime,omitempty"`

// The Flink job ended timestamp.
EndTime string `json:"endTime,omitempty"`

// The number of restarts.
RestartCount int32 `json:"restartCount,omitempty"`
}

// SavepointStatus defines the status of savepoint progress
// SavepointStatus is the status of savepoint progress.
type SavepointStatus struct {
// The ID of the Flink job.
JobID string `json:"jobID,omitempty"`
Expand All @@ -599,8 +613,8 @@ type SavepointStatus struct {
// Savepoint triggered reason.
TriggerReason string `json:"triggerReason,omitempty"`

// Savepoint requested time.
RequestTime string `json:"requestTime,omitempty"`
// Savepoint status update time.
UpdateTime string `json:"requestTime,omitempty"`

// Savepoint state.
State string `json:"state"`
Expand All @@ -609,6 +623,27 @@ type SavepointStatus struct {
Message string `json:"message,omitempty"`
}

type RevisionStatus struct {
// When the controller creates new ControllerRevision, it generates hash string from the FlinkCluster spec
// which is to be stored in ControllerRevision and uses it to compose the ControllerRevision name.
// Then the controller updates nextRevision to the ControllerRevision name.
// When update process is completed, the controller updates currentRevision as nextRevision.
// currentRevision and nextRevision is composed like this:
// <FLINK_CLUSTER_NAME>-<FLINK_CLUSTER_SPEC_HASH>-<REVISION_NUMBER_IN_CONTROLLERREVISION>
// e.g., myflinkcluster-c464ff7-5

// CurrentRevision indicates the version of FlinkCluster.
CurrentRevision string `json:"currentRevision,omitempty"`

// NextRevision indicates the version of FlinkCluster updating.
NextRevision string `json:"nextRevision,omitempty"`

// collisionCount is the count of hash collisions for the FlinkCluster. The controller
// uses this field as a collision avoidance mechanism when it needs to create the name for the
// newest ControllerRevision.
CollisionCount *int32 `json:"collisionCount,omitempty"`
}

// JobManagerIngressStatus defines the status of a JobManager ingress.
type JobManagerIngressStatus struct {
// The name of the Kubernetes ingress resource.
Expand Down Expand Up @@ -644,30 +679,14 @@ type FlinkClusterStatus struct {
// The status of the components.
Components FlinkClusterComponentsStatus `json:"components"`

// The status of control requested by user
// The status of control requested by user.
Control *FlinkClusterControlStatus `json:"control,omitempty"`

// The status of savepoint progress
// The status of savepoint progress.
Savepoint *SavepointStatus `json:"savepoint,omitempty"`

// When the controller creates new ControllerRevision, it generates hash string from the FlinkCluster spec
// which is to be stored in ControllerRevision and uses it to compose the ControllerRevision name.
// Then the controller updates nextRevision to the ControllerRevision name.
// When update process is completed, the controller updates currentRevision as nextRevision.
// currentRevision and nextRevision is composed like this:
// <FLINK_CLUSTER_NAME>-<FLINK_CLUSTER_SPEC_HASH>-<REVISION_NUMBER_IN_CONTROLLERREVISION>
// e.g., myflinkcluster-c464ff7-5

// CurrentRevision indicates the version of FlinkCluster.
CurrentRevision string `json:"currentRevision,omitempty"`

// NextRevision indicates the version of FlinkCluster updating.
NextRevision string `json:"nextRevision,omitempty"`

// collisionCount is the count of hash collisions for the FlinkCluster. The controller
// uses this field as a collision avoidance mechanism when it needs to create the name for the
// newest ControllerRevision.
CollisionCount *int32 `json:"collisionCount,omitempty"`
// The status of revision.
Revision RevisionStatus `json:"revision,omitempty"`

// Last update timestamp for this status.
LastUpdateTime string `json:"lastUpdateTime,omitempty"`
Expand Down
Loading