Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with flink application list command #2909

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions internal/flink/command_application_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package flink
import (
"github.com/spf13/cobra"

cmfsdk "github.com/confluentinc/cmf-sdk-go/v1"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/output"
)
Expand Down Expand Up @@ -43,23 +45,42 @@ func (c *command) applicationList(cmd *cobra.Command, _ []string) error {
if output.GetFormat(cmd) == output.Human {
list := output.NewList(cmd)
for _, app := range applications {
jobStatus, ok := app.Status["jobStatus"].(map[string]any)
if !ok {
jobStatus = map[string]any{}
}
envInApp, ok := app.Spec["environment"].(string)
if !ok {
envInApp = environment
}
list.Add(&flinkApplicationSummaryOut{
Name: app.Metadata["name"].(string),
Environment: envInApp,
JobName: jobStatus["jobName"].(string),
JobStatus: jobStatus["state"].(string),
})
appSummary := populateFlinkApplicationSummaryOut(app, environment)
list.Add(appSummary)
}
return list.Print()
}
// if the output format is not human, we serialize the output as it is (JSON or YAML)
return output.SerializedOutput(cmd, applications)
}

func populateFlinkApplicationSummaryOut(application cmfsdk.Application, envFromFlag string) *flinkApplicationSummaryOut {
vsantwana marked this conversation as resolved.
Show resolved Hide resolved
var appSummary *flinkApplicationSummaryOut

var jobStatus map[string]any = getOrDefault(application.Status, "jobStatus", map[string]any{})
jobNameString := getOrDefault(jobStatus, "jobName", "")
jobStatusString := getOrDefault(jobStatus, "state", "")
name := getOrDefault(application.Metadata, "name", "")
environment := getOrDefault(application.Spec, "flinkEnvironment", envFromFlag)

appSummary = &flinkApplicationSummaryOut{
Name: name,
Environment: environment,
JobName: jobNameString,
JobStatus: jobStatusString,
}

return appSummary
}

func getOrDefault[T any](m map[string]any, key string, d T) T {
value, ok := m[key]
if !ok {
return d
}
valueCast, ok := value.(T)
if !ok {
return d
}
return valueCast
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Name | Environment | Job Name | Job Status
----------------------+-------------+----------+-------------
new-env-application | new-env | |
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
[
{
"apiVersion": "cmf.confluent.io/v1alpha1",
"kind": "FlinkApplication",
"metadata": {
"name": "new-env-application"
},
"spec": {
"flinkConfiguration": {
"metrics.reporter.prom.factory.class": "org.apache.flink.metrics.prometheus.PrometheusReporterFactory",
"metrics.reporter.prom.port": "9249-9250",
"taskmanager.numberOfTaskSlots": "8"
},
"flinkEnvironmentName": "new-env",
"flinkVersion": "v1_19",
"image": "confluentinc/cp-flink:1.19.1-cp1",
"job": {
"jarURI": "local:///opt/flink/examples/streaming/StateMachineExample.jar",
"parallelism": 3,
"state": "running",
"upgradeMode": "stateless"
},
"jobManager": {
"resource": {
"cpu": 1,
"memory": "1048m"
}
},
"serviceAccount": "flink",
"taskManager": {
"resource": {
"cpu": 1,
"memory": "1048m"
}
}
},
"status": {
"clusterInfo": {
"flink-revision": "89d0b8f @ 2024-06-22T13:19:31+02:00",
"flink-version": "1.19.1-cp1",
"total-cpu": "3.0",
"total-memory": "3296722944"
},
"error": null,
"jobManagerDeploymentStatus": "DEPLOYING",
"jobStatus": {
"checkpointInfo": {
"formatType": null,
"lastCheckpoint": null,
"lastPeriodicCheckpointTimestamp": 0,
"triggerId": null,
"triggerTimestamp": null,
"triggerType": null
},
"jobId": "dcabb1ad6c40495bc2d7fa7a0097c5aa",
"savepointInfo": {
"formatType": null,
"lastPeriodicSavepointTimestamp": 0,
"lastSavepoint": null,
"savepointHistory": [],
"triggerId": null,
"triggerTimestamp": null,
"triggerType": null
},
"startTime": "1726640263746",
"updateTime": "1726640280561"
},
"lifecycleState": "DEPLOYED",
"observedGeneration": 4,
"reconciliationStatus": {
"lastReconciledSpec": "",
"lastStableSpec": "",
"reconciliationTimestamp": 1726640346899,
"state": "DEPLOYED"
},
"taskManager": {
"labelSelector": "component=taskmanager,app=basic-example",
"replicas": 1
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
- apiversion: cmf.confluent.io/v1alpha1
kind: FlinkApplication
metadata:
name: new-env-application
spec:
flinkConfiguration:
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: 9249-9250
taskmanager.numberOfTaskSlots: "8"
flinkEnvironmentName: new-env
flinkVersion: v1_19
image: confluentinc/cp-flink:1.19.1-cp1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 3
state: running
upgradeMode: stateless
jobManager:
resource:
cpu: 1
memory: 1048m
serviceAccount: flink
taskManager:
resource:
cpu: 1
memory: 1048m
status:
clusterInfo:
flink-revision: 89d0b8f @ 2024-06-22T13:19:31+02:00
flink-version: 1.19.1-cp1
total-cpu: "3.0"
total-memory: "3296722944"
error: null
jobManagerDeploymentStatus: DEPLOYING
jobStatus:
checkpointInfo:
formatType: null
lastCheckpoint: null
lastPeriodicCheckpointTimestamp: 0
triggerId: null
triggerTimestamp: null
triggerType: null
jobId: dcabb1ad6c40495bc2d7fa7a0097c5aa
savepointInfo:
formatType: null
lastPeriodicSavepointTimestamp: 0
lastSavepoint: null
savepointHistory: []
triggerId: null
triggerTimestamp: null
triggerType: null
startTime: "1726640263746"
updateTime: "1726640280561"
lifecycleState: DEPLOYED
observedGeneration: 4
reconciliationStatus:
lastReconciledSpec: ""
lastStableSpec: ""
reconciliationTimestamp: 1.726640346899e+12
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=basic-example
replicas: 1
3 changes: 3 additions & 0 deletions test/flink_onprem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ func (s *CLITestSuite) TestFlinkApplicationList() {
{args: "flink application list --environment test", fixture: "flink/application/list-empty-env.golden"},
{args: "flink application list --environment default --output json", fixture: "flink/application/list-json.golden"},
{args: "flink application list --environment default --output human", fixture: "flink/application/list-human.golden"},
{args: "flink application list --environment new-env", fixture: "flink/application/list-success-human-missing-attribute.golden"},
{args: "flink application list --environment new-env --output yaml", fixture: "flink/application/list-success-yaml-missing-attribute.golden"},
{args: "flink application list --environment new-env --output json", fixture: "flink/application/list-success-json-missing-attribute.golden"},
}

for _, test := range tests {
Expand Down
14 changes: 13 additions & 1 deletion test/test-server/flink_onprem_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func handleCmfApplications(t *testing.T) http.HandlerFunc {
environment := vars["environment"]
switch r.Method {
case http.MethodGet:
if environment != "default" && environment != "test" && environment != "update-failure" {
if environment != "default" && environment != "test" && environment != "update-failure" && environment != "new-env" {
http.Error(w, "Environment not found", http.StatusNotFound)
return
}
Expand Down Expand Up @@ -252,6 +252,18 @@ func handleCmfApplications(t *testing.T) http.HandlerFunc {
}
}

// for new-env, return an application where some of the fields are missing.
if environment == "new-env" && page == "0" {
newApplication := createApplication("new-env-application", "new-env")
delete(newApplication.Spec, "flinkEnvironment")
delete(newApplication.Status["jobStatus"].(map[string]interface{}), "jobName")
delete(newApplication.Status["jobStatus"].(map[string]interface{}), "state")
items := []cmfsdk.Application{newApplication}
applicationsPage = map[string]interface{}{
"items": items,
}
}

err := json.NewEncoder(w).Encode(applicationsPage)
require.NoError(t, err)
return
Expand Down