Skip to content

Commit

Permalink
e2e_add_k8s_native
Browse files Browse the repository at this point in the history
  • Loading branch information
zackyoungh committed Dec 24, 2024
1 parent 1f73bc9 commit 1610096
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 14 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/k3s-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Build documentation
on:
workflow_dispatch:
jobs:
k3s:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Init k3s
uses: nolar/setup-k3d-k3s@v1
with:
version: v1.21.2+k3s1
- name: Get k3s kube config
run: cat /etc/rancher/k3s/k3s.yaml && mkdir ./kube && cp /etc/rancher/k3s/k3s.yaml ./kube/k3s.yaml
- name: Init k8s RBAC and namespace
run: |
kubectl create namespace dinky
kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=dinky:dinky
kubectl create serviceaccount flink-service-account
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=dinky:flink-service-account
79 changes: 69 additions & 10 deletions e2e_test/tools/env.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from requests import Session
import urllib.parse as urlparse
from hdfs.client import Client
Expand Down Expand Up @@ -29,7 +31,20 @@ def addStandaloneCluster(session: Session) -> int:
raise Exception(f"Cluster {name} not found")


def addYarnCluster(session: Session) -> int:
def addApplicationCluster(session: Session, params: dict) -> Optional[int]:
name = params['name']
test_connection_yarn_resp = session.post(url("api/clusterConfiguration/testConnect"), json=params)
assertRespOk(test_connection_yarn_resp, "Test yarn connectivity")
test_connection_yarn_resp = session.put(url("api/clusterConfiguration/saveOrUpdate"), json=params)
assertRespOk(test_connection_yarn_resp, "Add Yarn Application Cluster")
get_app_list = session.get(url(f"api/clusterConfiguration/list?keyword={name}"), json=params)
assertRespOk(get_app_list, "Get Yarn Application Cluster")
for data in get_app_list.json()["data"]:
if data["name"] == name:
return data['id']


def addYarnCluster(session: Session) -> Optional[int]:
client = Client("http://namenode:9870")
flink_lib_path = yarn_flink_lib
client.makedirs(flink_lib_path)
Expand Down Expand Up @@ -72,12 +87,56 @@ def addYarnCluster(session: Session) -> int:
}
}
log.info(f"Adding yarn application cluster, parameters:{params}")
test_connection_yarn_resp = session.post(url("api/clusterConfiguration/testConnect"), json=params)
assertRespOk(test_connection_yarn_resp, "Test yarn connectivity")
test_connection_yarn_resp = session.put(url("api/clusterConfiguration/saveOrUpdate"), json=params)
assertRespOk(test_connection_yarn_resp, "Add Yarn Application Cluster")
get_app_list = session.get(url(f"api/clusterConfiguration/list?keyword={name}"), json=params)
assertRespOk(get_app_list, "Get Yarn Application Cluster")
for data in get_app_list.json()["data"]:
if data["name"] == name:
return data['id']
return addApplicationCluster(session, params)


def addK8sNativeCluster(session: Session) -> Optional[int]:
name = "k8s-native-test"
params = {
"type": "kubernetes-application",
"name": name,
"enabled": True,
"config": {
"kubernetesConfig": {
"configuration": {
"kubernetes.rest-service.exposed.type": "NodePort",
"kubernetes.namespace": "dinky",
"kubernetes.service-account": "dinky",
"kubernetes.container.image": "registry.cn-hangzhou.aliyuncs.com/dinky-flink/dinky-flink:1.20.0-scala_2.12-java8"
},
"ingressConfig": {
"kubernetes.ingress.enabled": False
},
"kubeConfig": "---\r\napiVersion: v1\r\nclusters:\r\n- cluster:\r\n certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkakNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTXpRNU5EY3lOalF3SGhjTk1qUXhNakl6TURrME56UTBXaGNOTXpReE1qSXhNRGswTnpRMApXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTXpRNU5EY3lOalF3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUb3UvcWJSMlpJbnJiSWZtaVk5YTlJT2V3UXBpbEhJdWZvTi93NjlsSDEKdHZpbW16azJlM2hSSmtreloreFptMWRXN3l1M2FuaVZnN3Z3dkEvMVV0Ri9vMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVXM0dDVucW1oSmhKQW5XNEdBS3BJClBCVnk2dWN3Q2dZSUtvWkl6ajBFQXdJRFJ3QXdSQUlnV2tiZ3JybkFMdmxGTldiLzFzTkNISGJhYUgwUmIxS3MKMTY0dVN0TFdPOThDSUZMMHdvcEtJdUZIU25iRVVxNzdDMWFwVXJlNG1Kend5dy9tM0VtcGo0emkKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=\r\n server: https://127.0.0.1:37749\r\n name: k3d-demo\r\ncontexts:\r\n- context:\r\n cluster: k3d-demo\r\n user: admin@k3d-demo\r\n name: k3d-demo\r\ncurrent-context: k3d-demo\r\nkind: Config\r\npreferences: {}\r\nusers:\r\n- name: admin@k3d-demo\r\n user:\r\n client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrRENDQVRlZ0F3SUJBZ0lJTzB3eG5RNG9oRmd3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOek0wT1RRM01qWTBNQjRYRFRJME1USXlNekE1TkRjME5Gb1hEVEkxTVRJeQpNekE1TkRjME5Gb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJEbmM0SVZDTExRVzB2VmEKdUtLbGZYdFRRRElrSFl0dFVsQkFxaFpwd0Z6d3BvaUJQaW8xZzdsRG5ja0tVNFBzS0oreG5GSnpPd0V0ZlFaaAprakZkOC9XalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVDZSVTBOL3k3VUU0WEMxTmpCQ2xNajhsZDNzekFLQmdncWhrak9QUVFEQWdOSEFEQkUKQWlBZGpDMWU1ODBUMjZiSDluSXZoaE1pOGZoYnRQT2hzMnJ1OVV6MGZ4eExBd0lnZUNYcWNGdnFMbGs0KzJDSwpndFVwZ0txWVkxeDhEck9LZWNRMGdrUzV2QVk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTXpRNU5EY3lOalF3SGhjTk1qUXhNakl6TURrME56UTBXaGNOTXpReE1qSXhNRGswTnpRMApXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTXpRNU5EY3lOalF3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTYnlzbDJ2NFp6YTBna0s4cWJIU3MvUS8zQTE5US9Mek1jazM4OW5zZkYKQjExMzlPM29ITWlUTS9weDVsa09wcHAxTk1qTzB3UFpKeHp6NUNpTExSVEhvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVStrVk5EZjh1MUJPRnd0VFl3UXBUCkkvSlhkN013Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUlnUWxpcStYekRqbzZEYURIOGNabXQ2cFlyZCs2d3J2NXgKQXpVVUlZSm8zaklDSVFDU0JDR0E0NzkzQlo1M2dCcE1Qbk5xcHJ2M21lSW1UZmlJaXo2KzNWaGxHdz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K\r\n client-key-data: LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSURhZEdTVDZSMHRTSDMrcFhPRFRzSU93RjloRzlvdFdWNjU5QlpEYlF6ZVlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFT2R6Z2hVSXN0QmJTOVZxNG9xVjllMU5BTWlRZGkyMVNVRUNxRm1uQVhQQ21pSUUrS2pXRAp1VU9keVFwVGcrd29uN0djVW5NN0FTMTlCbUdTTVYzejlRPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=",
"podTemplate": "apiVersion: v1\r\nkind: Pod\r\nmetadata:\r\n name: jobmanager-pod-template\r\nspec:\r\n initContainers:\r\n - name: artifacts-fetcher-dinky\r\n image: docker.1panel.live/library/busybox:latest\r\n # Use wget or other tools to get user jars from remote storage\r\n command: [ 'wget', 'http://10.13.8.216:9001/dinky-app-1.20-1.2.0-jar-with-dependencies.jar', '-O', '/flink-usrlib/dinky-app-1.20-1.2.0-jar-with-dependencies.jar' ]\r\n volumeMounts:\r\n - mountPath: /flink-usrlib\r\n name: flink-usrlib\r\n - name: artifacts-fetcher-mysql\r\n image: docker.1panel.live/library/busybox:latest\r\n # Use wget or other tools to get user jars from remote storage\r\n command: [ 'wget', 'http://10.13.8.216:9001/mysql-connector-java-8.0.30.jar', '-O', '/flink-usrlib/mysql-connector-java-8.0.30.jar' ]\r\n volumeMounts:\r\n - mountPath: /flink-usrlib\r\n name: flink-usrlib\r\n - name: artifacts-fetcher-hadoop\r\n image: docker.1panel.live/library/busybox:latest\r\n # Use wget or other tools to get user jars from remote storage\r\n command: [ 'wget', 'http://10.13.8.216:9001/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar', '-O', '/flink-usrlib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar' ]\r\n volumeMounts:\r\n - mountPath: /flink-usrlib\r\n name: flink-usrlib\r\n containers:\r\n # Do not change the main container name\r\n - name: flink-main-container\r\n # env:\r\n # - name: FLINK_CONF_DIR\r\n # value: /etc/hadoop/conf\r\n resources:\r\n requests:\r\n ephemeral-storage: 2048Mi\r\n limits:\r\n ephemeral-storage: 2048Mi\r\n volumeMounts:\r\n - mountPath: /opt/flink/usrlib\r\n name: flink-usrlib\r\n volumes:\r\n - name: flink-usrlib\r\n emptyDir: { }\r\n",
},
"clusterConfig": {
"flinkConfigPath": "/etc/hadoop/conf"
},
"flinkConfig": {
"flinkConfigList": [
{
"name": "user.artifacts.raw-http-enabled",
"value": "true"
},
{
"name": "kubernetes.taskmanager.service-account",
"value": "dinky"
},
{
"name": "kubernetes.flink.conf.dir",
"value": "/etc/hadoop/conf"
}
],
"configuration": {
"jobmanager.memory.process.size": "1024mb",
"taskmanager.memory.process.size": "1024mb"
}
},
"appConfig": {
"userJarPath": "http://10.13.8.216:9001/dinky-app-1.20-1.2.0-jar-with-dependencies.jar"
}
}
}
log.info(f"Adding k8s native application cluster, parameters:{params}")
return addApplicationCluster(session, params)
5 changes: 3 additions & 2 deletions e2e_test/tools/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import requests

from env import addStandaloneCluster, addYarnCluster
from env import addStandaloneCluster, addYarnCluster, addK8sNativeCluster
from login import login
from task import addCatalogue, Task

Expand All @@ -9,7 +9,8 @@
login(session)
clusterId = addStandaloneCluster(session)
yarn_cluster_id = addYarnCluster(session)
k8s_native_cluster_id = addK8sNativeCluster(session)
catalogue = addCatalogue(session, "flink-sql-task")
sql = "DROP TABLE IF EXISTS source_table3;\r\nCREATE TABLE IF NOT EXISTS source_table3(\r\n--订单id\r\n`order_id` BIGINT,\r\n--产品\r\n\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n\r\n--支付时间\r\n`order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), -- `在这里插入代码片`\r\n--WATERMARK\r\nWATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND\r\n) WITH(\r\n'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_id.min' = '1',\r\n 'fields.order_id.max' = '2',\r\n 'fields.amount.min' = '1',\r\n 'fields.amount.max' = '10',\r\n 'fields.product.min' = '1',\r\n 'fields.product.max' = '2'\r\n);\r\n\r\n-- SELECT * FROM source_table3 LIMIT 10;\r\n\r\nDROP TABLE IF EXISTS sink_table5;\r\nCREATE TABLE IF NOT EXISTS sink_table5(\r\n--产品\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n--支付时间\r\n`order_time` TIMESTAMP(3),\r\n--1分钟时间聚合总数\r\n`one_minute_sum` BIGINT\r\n) WITH(\r\n'connector'='print'\r\n);\r\n\r\nINSERT INTO sink_table5\r\nSELECT\r\nproduct,\r\namount,\r\norder_time,\r\nSUM(amount) OVER(\r\nPARTITION BY product\r\nORDER BY order_time\r\n-- 标识统计范围是1个 product 的最近 1 分钟的数据\r\nRANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW\r\n) as one_minute_sum\r\nFROM source_table3;"
flink_sql_datagen_test = Task(session, clusterId, yarn_cluster_id, catalogue.id, "flink-sql-datagen-test", sql)
flink_sql_datagen_test = Task(session, clusterId, yarn_cluster_id,k8s_native_cluster_id, catalogue.id, "flink-sql-datagen-test", sql)
flink_sql_datagen_test.runFlinkTask(wait_time=10, is_async=True)
8 changes: 6 additions & 2 deletions e2e_test/tools/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ class FlinkRunMode(Enum):
LOCAL = "local"
STANDALONE = "standalone"
YARN_APPLICATION = "yarn-application"
KUBERNETES_APPLICATION = "kubernetes-application"

@staticmethod
def getAllMode():
return [FlinkRunMode.LOCAL, FlinkRunMode.STANDALONE, FlinkRunMode.YARN_APPLICATION]
return [FlinkRunMode.LOCAL, FlinkRunMode.STANDALONE, FlinkRunMode.YARN_APPLICATION,FlinkRunMode.KUBERNETES_APPLICATION]


class Task:
def __init__(self, session: requests.Session, cluster_id: int, yarn_cluster_id: int, parent_id: int, name: str,
def __init__(self, session: requests.Session, cluster_id: int, yarn_cluster_id: int,k8s_native_cluster_id: int, parent_id: int, name: str,
statement):
self.session = session
self.cluster_id = cluster_id
self.yarn_cluster_id = yarn_cluster_id
self.k8s_native_cluster_id = k8s_native_cluster_id
self.parent_id = parent_id
self.name = name
self.statement = statement
Expand Down Expand Up @@ -69,6 +71,8 @@ def addTask(self, name: str, parent_id: int = 0, dialect: str = "FlinkSql",
params["task"]["clusterId"] = self.cluster_id
elif run_model == FlinkRunMode.YARN_APPLICATION:
params["task"]["clusterConfigurationId"] = self.yarn_cluster_id
elif run_model == FlinkRunMode.KUBERNETES_APPLICATION:
params["task"]["clusterConfigurationId"] = self.k8s_native_cluster_id
add_parent_dir_resp = session.put(url("api/catalogue/saveOrUpdateCatalogueAndTask"), json=params)
assertRespOk(add_parent_dir_resp, "Create a task")
get_all_tasks_resp = session.post(url("api/catalogue/getCatalogueTreeData"), json={
Expand Down

0 comments on commit 1610096

Please sign in to comment.