diff --git a/.github/workflows/backend.yaml b/.github/workflows/backend.yaml index 2e35ba4bc3..c1f23eb91d 100644 --- a/.github/workflows/backend.yaml +++ b/.github/workflows/backend.yaml @@ -19,6 +19,7 @@ name: Backend on: push: + workflow_dispatch: pull_request: branches: - dev @@ -57,17 +58,17 @@ jobs: frontend: - 'dinky-web/**' - uses: actions/setup-node@v3 - if: steps.filter.outputs.frontend == 'true' + if: steps.filter.outputs.frontend == 'true' with: node-version: 16 - name: Get npm cache directory id: npm-cache-dir - if: steps.filter.outputs.frontend == 'true' + if: steps.filter.outputs.frontend == 'true' run: | echo "::set-output name=dir::$(npm config get cache)" - uses: actions/cache@v3 id: npm-cache # use this to check for `cache-hit` ==> if: steps.npm-cache.outputs.cache-hit != 'true' - if: steps.filter.outputs.frontend == 'true' + if: steps.filter.outputs.frontend == 'true' with: path: | ${{ steps.npm-cache-dir.outputs.dir }} @@ -76,20 +77,25 @@ jobs: restore-keys: | ${{ runner.os }}-node- - name: Install Dependencies - if: steps.filter.outputs.frontend == 'true' + if: steps.filter.outputs.frontend == 'true' run: cd dinky-web && npm install --no-audit --progress=false --legacy-peer-deps - name: Npm Web Build - if: steps.filter.outputs.frontend == 'true' + if: steps.filter.outputs.frontend == 'true' run: cd dinky-web && npm run build - build_jdk: - name: Build_JDK + - name: Upload artifact + uses: actions/upload-artifact@v4 + with: + name: dinky-web + path: ./dinky-web/dist + build_release: + name: Build Release runs-on: ubuntu-latest needs: check strategy: fail-fast: true matrix: - jdk: [8, 11] - flink: ['1.14', '1.15', '1.16', '1.17', '1.18', '1.19', '1.20'] + jdk: [ 8, 11 ] + flink: [ '1.14', '1.15', '1.16', '1.17', '1.18', '1.19', '1.20' ] timeout-minutes: 30 env: @@ -100,7 +106,7 @@ jobs: - name: Set up JDK ${{ matrix.jdk }} uses: actions/setup-java@v2 with: - java-version: ${{ matrix.jdk }} + java-version: ${{ matrix.jdk }} distribution: 'adopt' - name: Cache local Maven repository uses: actions/cache@v3 @@ -132,6 +138,87 @@ jobs: - name: Check package size run: | ./check_package_size.sh -# - name: Run Unit tests -# run: | -# ./mvnw -T 2C -B clean verify -Dmaven.test.skip=false -Dgpg.skip=true --no-snapshot-updates + - name: Upload artifact + uses: actions/upload-artifact@v4 + if: ${{ matrix.jdk == 8 }} + with: + name: dinky-realease-${{ matrix.flink }} + path: ./build/dinky-release*.tar.gz + # - name: Run Unit tests + # run: | + # ./mvnw -T 2C -B clean verify -Dmaven.test.skip=false -Dgpg.skip=true --no-snapshot-updates + + + run_e2e: + name: Run E2E + needs: build_release + strategy: + fail-fast: true + matrix: + flink: [ '1.14', '1.15', '1.16', '1.17', '1.18', '1.19', '1.20' ] + runs-on: ubuntu-latest + services: + registry: + image: registry:2 + ports: + - 5000:5000 + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Download artifact + uses: actions/download-artifact@v4 + with: + name: dinky-realease-${{ matrix.flink }} + path: ./build + # 设置 QEMU, 后面 docker buildx 依赖此. + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + # 设置 Docker buildx, 方便构建 Multi platform 镜像 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + with: + driver-opts: network=host + - name: Build Dinky Image + uses: docker/build-push-action@v5 + with: + context: . + file: ./e2e_test/docker-compose-env/Dockerfile + # 是否 docker push + push: true + build-args: | + FLINK_VERSION=${{ matrix.flink }} + tags: | + localhost:5000/dinky/dinky-test:flink + - name: Init Env Jar + run: | + wget -O e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar && + wget -O e2e_test/docker-compose-env/flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && + wget -O e2e_test/docker-compose-env/dinky/javax.ws.rs-api-2.1.1.jar https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.jar + - name: Init Docker Network + run: | + docker network create -d bridge dinky_net + - name: Init Run Docker MySQL + uses: hoverkraft-tech/compose-action@v2.0.2 + with: + compose-file: ./e2e_test/docker-compose-env/mysql/docker-compose.yml + - name: Init Run Docker Dinky + uses: hoverkraft-tech/compose-action@v2.0.2 + with: + compose-file: ./e2e_test/docker-compose-env/dinky/docker-compose.yml + - name: Init Run Docker Hadoop + uses: hoverkraft-tech/compose-action@v2.0.2 + with: + compose-file: ./e2e_test/docker-compose-env/hadoop/docker-compose.yml + - name: Replace Flink docker-compose yml + run: | + export FLINK_VERSION=${{ matrix.flink }} && envsubst < ./e2e_test/docker-compose-env/flink/docker-compose.yml > ./e2e_test/docker-compose-env/flink/docker-compose-${{ matrix.flink }}.yml + - name: Init Run Docker Flink + uses: hoverkraft-tech/compose-action@v2.0.2 + with: + compose-file: ./e2e_test/docker-compose-env/flink/docker-compose-${{ matrix.flink }}.yml + + - name: Cp Flink Jar Deps + run: docker cp dinky:/opt/dinky/ ./dinky-release + - name: Run Docker Python Script + run: | + docker run -v ./dinky-release/extends/flink${{ matrix.flink }}:/flink/lib -v ./e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar:/flink/lib/mysql-connector-java-8.0.30.jar -v./e2e_test/docker-compose-env/flink/conf:/flink/conf -v ./dinky-release/jar:/dinky/jar -v./e2e_test/tools:/app -w /app --net dinky_net --rm --entrypoint /bin/bash python:3.9 -c 'pip install -r requirements.txt && python main.py dinky:8888' diff --git a/.github/workflows/docker_build.yaml b/.github/workflows/docker_build.yaml index 7b9b6925af..0333ed3dd1 100644 --- a/.github/workflows/docker_build.yaml +++ b/.github/workflows/docker_build.yaml @@ -34,7 +34,7 @@ jobs: strategy: fail-fast: true matrix: - flink: [1.14,1.15,1.16,1.17,1.18,1.19] + flink: ["1.14","1.15","1.16","1.17","1.18","1.19","1.20"] steps: # git checkout 代码 - name: Checkout diff --git a/.github/workflows/e2e_test.yml b/.github/workflows/e2e_test.yml deleted file mode 100644 index f3e0b82930..0000000000 --- a/.github/workflows/e2e_test.yml +++ /dev/null @@ -1,83 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -name: E2E Test - -on: - workflow_dispatch: - -jobs: - init_env: - name: init env - runs-on: ubuntu-latest - services: - registry: - image: registry:2 - ports: - - 5000:5000 - steps: - - name: Checkout - uses: actions/checkout@v4 - - name: Init Env Jar - run: | - wget -O e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar && - wget -O e2e_test/docker-compose-env/flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar - - name: Init Docker Network - run: | - docker network create -d bridge dinky_net - - name: Init Run Docker MySQL - uses: hoverkraft-tech/compose-action@v2.0.2 - with: - compose-file: ./e2e_test/docker-compose-env/mysql/docker-compose.yml - - name: Init Run Docker Hadoop - uses: hoverkraft-tech/compose-action@v2.0.2 - with: - compose-file: ./e2e_test/docker-compose-env/hadoop/docker-compose.yml - # 设置 QEMU, 后面 docker buildx 依赖此. - - name: Init Run Docker Flink - uses: hoverkraft-tech/compose-action@v2.0.2 - with: - compose-file: ./e2e_test/docker-compose-env/flink/docker-compose.yml - # 设置 QEMU, 后面 docker buildx 依赖此. - - name: Set up QEMU - uses: docker/setup-qemu-action@v3 - # 设置 Docker buildx, 方便构建 Multi platform 镜像 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - with: - driver-opts: network=host - - name: Build Dinky Image - uses: docker/build-push-action@v5 - with: - file: ./deploy/docker/Dockerfile - # 是否 docker push - push: true - build-args: | - FLINK_VERSION=1.14 - DINKY_VERSION=1 - tags: | - localhost:5000/dinky/dinky-test:flink-1.14 - - name: Init Run Docker Dinky - uses: hoverkraft-tech/compose-action@v2.0.2 - with: - compose-file: ./e2e_test/docker-compose-env/dinky/docker-compose.yml - - name: Run Docker Python Script -# uses: hoverkraft-tech/compose-action@v2.0.2 -# with: -# compose-file: ./e2e_test/tools/docker-compose.yml - run: | - sleep 30 && cd ./e2e_test/tools && docker run -v ./:/app -w /app --net dinky_net --rm --entrypoint /bin/bash python:3.9 -c 'pip install -r requirements.txt && python main.py dinky14:8888' diff --git a/e2e_test/docker-compose-env/Dockerfile b/e2e_test/docker-compose-env/Dockerfile new file mode 100755 index 0000000000..1a80fe1b39 --- /dev/null +++ b/e2e_test/docker-compose-env/Dockerfile @@ -0,0 +1,28 @@ +ARG FLINK_VERSION + +FROM flink:${FLINK_VERSION}-scala_2.12-java8 as flink-base + + +FROM eclipse-temurin:8-jre-jammy + +ARG FLINK_VERSION +ENV FLINK_VERSION=${FLINK_VERSION} +ENV DINKY_HOME=/opt/dinky/ +ENV H2_DB=./tmp/db/h2 +ADD build/dinky-release*.tar.gz /opt +RUN ls /opt && mv /opt/dinky-release* /opt/dinky + + +WORKDIR /opt/dinky/ + +USER root +COPY --from=flink-base /opt/flink/lib/*.jar /opt/dinky/extends/flink${FLINK_VERSION}/flink/ +RUN rm -f /opt/dinky/extends/flink${FLINK_VERSION}/flink/flink-table-planner-loader*.jar + +COPY --from=flink-base /opt/flink/opt/flink-table-planner*.jar /opt/dinky/extends/flink${FLINK_VERSION}/flink/ + +RUN mkdir /opt/dinky/customJar && chmod -R 777 /opt/dinky/ && sed -i 's/-Xms512M -Xmx2048M -XX:PermSize=512M/-XX:+UseContainerSupport -XX:InitialRAMPercentage=70.0 -XX:MaxRAMPercentage=70.0/g' ./bin/auto.sh + +EXPOSE 8888 + +CMD ./bin/auto.sh startOnPending diff --git a/e2e_test/docker-compose-env/dinky/docker-compose.yml b/e2e_test/docker-compose-env/dinky/docker-compose.yml index f73d246fde..5de79aa65a 100644 --- a/e2e_test/docker-compose-env/dinky/docker-compose.yml +++ b/e2e_test/docker-compose-env/dinky/docker-compose.yml @@ -3,9 +3,10 @@ networks: dinky_net: external: true services: - dinky14: + dinky: + container_name: dinky restart: always - image: localhost:5000/dinky/dinky-test:flink-1.14 + image: localhost:5000/dinky/dinky-test:flink environment: - DB_ACTIVE=mysql - MYSQL_ADDR=mysql:3306 @@ -14,6 +15,9 @@ services: - MYSQL_PASSWORD=dinky volumes: - ./mysql-connector-java-8.0.30.jar:/opt/dinky/lib/mysql-connector-java-8.0.30.jar + - ./javax.ws.rs-api-2.1.1.jar:/opt/dinky/lib/javax.ws.rs-api-2.1.1.jar - ../flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/dinky/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar + - ../flink/conf/log4j-console.properties:/flink/conf/log4j-console.properties + - ../hadoop:/flink/conf networks: - dinky_net diff --git a/e2e_test/docker-compose-env/flink/conf/flink-conf.yaml b/e2e_test/docker-compose-env/flink/conf/flink-conf.yaml index 597f641cab..82606aa2cd 100644 --- a/e2e_test/docker-compose-env/flink/conf/flink-conf.yaml +++ b/e2e_test/docker-compose-env/flink/conf/flink-conf.yaml @@ -272,8 +272,5 @@ classloader.resolve-order: parent-first # Interval in milliseconds for refreshing the monitored directories. #historyserver.archive.fs.refresh-interval: 10000 -blob.server.port: 6124 -query.server.port: 6125 - jobmanager.rpc.address: jobmanager env.java.opts: "-Dfile.encoding=UTF-8" diff --git a/e2e_test/docker-compose-env/flink/docker-compose.yml b/e2e_test/docker-compose-env/flink/docker-compose.yml index 4ca9864718..cd300ec22a 100644 --- a/e2e_test/docker-compose-env/flink/docker-compose.yml +++ b/e2e_test/docker-compose-env/flink/docker-compose.yml @@ -4,8 +4,10 @@ networks: external: true services: jobmanager: + hostname: jobmanager + container_name: jobmanager restart: always - image: flink:1.14.6 + image: flink:${FLINK_VERSION}-scala_2.12-java8 command: jobmanager environment: - HADOOP_CONF_DIR=/opt/flink/conf @@ -15,7 +17,9 @@ services: networks: - dinky_net taskmanager: - image: flink:1.14.6 + hostname: taskmanager + container_name: taskmanager + image: flink:${FLINK_VERSION}-scala_2.12-java8 command: taskmanager environment: - HADOOP_CONF_DIR=/opt/flink/conf @@ -24,3 +28,5 @@ services: - ./flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar networks: - dinky_net + depends_on: + - jobmanager diff --git a/e2e_test/docker-compose-env/hadoop/flink-conf.yaml b/e2e_test/docker-compose-env/hadoop/flink-conf.yaml new file mode 100644 index 0000000000..0f8b89dab7 --- /dev/null +++ b/e2e_test/docker-compose-env/hadoop/flink-conf.yaml @@ -0,0 +1,281 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +#============================================================================== +# Common +#============================================================================== + +# The external address of the host on which the JobManager runs and can be +# reached by the TaskManagers and any clients which want to connect. This setting +# is only used in Standalone mode and may be overwritten on the JobManager side +# by specifying the --host parameter of the bin/jobmanager.sh executable. +# In high availability mode, if you use the bin/start-cluster.sh script and setup +# the conf/masters file, this will be taken care of automatically. Yarn +# automatically configure the host name based on the hostname of the node where the +# JobManager runs. + + +# The RPC port where the JobManager is reachable. + +jobmanager.rpc.port: 6123 + + +# The total process memory size for the JobManager. +# +# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. + +jobmanager.memory.process.size: 1024m + + +# The total process memory size for the TaskManager. +# +# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. + +taskmanager.memory.process.size: 1024m + +# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. +# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. +# +# taskmanager.memory.flink.size: 1280m + +# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. + +taskmanager.numberOfTaskSlots: 1 + +# taskmanager.cpu.cores: 1 +# taskmanager.memory.task.heap.size: 1200m +# taskmanager.memory.managed.size: 0 +# taskmanager.memory.network.fraction: 0.1 +# taskmanager.memory.network.min: 64mb +# taskmanager.memory.network.max: 64mb + +# The parallelism used for programs that did not specify and other parallelism. + +parallelism.default: 1 + +# The default file system scheme and authority. +# +# By default file paths without scheme are interpreted relative to the local +# root file system 'file:///'. Use this to override the default and interpret +# relative paths relative to a different file system, +# for example 'hdfs://mynamenode:12345' +# +# fs.default-scheme + +#============================================================================== +# High Availability +#============================================================================== + +# The high-availability mode. Possible options are 'NONE' or 'zookeeper'. +# +# high-availability: zookeeper + +# The path where metadata for master recovery is persisted. While ZooKeeper stores +# the small ground truth for checkpoint and leader election, this location stores +# the larger objects, like persisted dataflow graphs. +# +# Must be a durable file system that is accessible from all nodes +# (like HDFS, S3, Ceph, nfs, ...) +# +# high-availability.storageDir: hdfs:///flink/ha/ + +# The list of ZooKeeper quorum peers that coordinate the high-availability +# setup. This must be a list of the form: +# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) +# +# high-availability.zookeeper.quorum: localhost:2181 + + +# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes +# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) +# The default value is "open" and it can be changed to "creator" if ZK security is enabled +# +# high-availability.zookeeper.client.acl: open + +#============================================================================== +# Fault tolerance and checkpointing +#============================================================================== + +# The backend that will be used to store operator state checkpoints if +# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0. +# +# Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details. +# +# execution.checkpointing.interval: 3min +# execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION] +# execution.checkpointing.max-concurrent-checkpoints: 1 +# execution.checkpointing.min-pause: 0 +# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE] +# execution.checkpointing.timeout: 10min +# execution.checkpointing.tolerable-failed-checkpoints: 0 +# execution.checkpointing.unaligned: false +# +# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the +# . +# +# state.backend: filesystem + +# Directory for checkpoints filesystem, when using any of the default bundled +# state backends. +# +# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints + +# Default target directory for savepoints, optional. +# +# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints + +# Flag to enable/disable incremental checkpoints for backends that +# support incremental checkpoints (like the RocksDB state backend). +# +# state.backend.incremental: false + +# The failover strategy, i.e., how the job computation recovers from task failures. +# Only restart tasks that may have been affected by the task failure, which typically includes +# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption. + +jobmanager.execution.failover-strategy: region + +#============================================================================== +# Rest & web frontend +#============================================================================== + +# The port to which the REST client connects to. If rest.bind-port has +# not been specified, then the server will bind to this port as well. +# +#rest.port: 8282 + +# The address to which the REST client will connect to +# +#rest.address: 0.0.0.0 + +# Port range for the REST and web server to bind to. +# +#rest.bind-port: 50000-50100 + +# The address that the REST & web server binds to +# +#rest.bind-address: 0.0.0.0 + +# Flag to specify whether job submission is enabled from the web-based +# runtime monitor. Uncomment to disable. + +#web.submit.enable: false + +# Flag to specify whether job cancellation is enabled from the web-based +# runtime monitor. Uncomment to disable. + +#web.cancel.enable: false + +#============================================================================== +# Advanced +#============================================================================== + +# Override the directories for temporary files. If not specified, the +# system-specific Java temporary directory (java.io.tmpdir property) is taken. +# +# For framework setups on Yarn, Flink will automatically pick up the +# containers' temp directories without any need for configuration. +# +# Add a delimited list for multiple directories, using the system directory +# delimiter (colon ':' on unix) or a comma, e.g.: +# /data1/tmp:/data2/tmp:/data3/tmp +# +# Note: Each directory entry is read from and written to by a different I/O +# thread. You can include the same directory multiple times in order to create +# multiple I/O threads against that directory. This is for example relevant for +# high-throughput RAIDs. +# +# io.tmp.dirs: /tmp + +# The classloading resolve order. Possible values are 'child-first' (Flink's default) +# and 'parent-first' (Java's default). +# +# Child first classloading allows users to use different dependency/library +# versions in their application than those in the classpath. Switching back +# to 'parent-first' may help with debugging dependency issues. +# +# classloader.resolve-order: child-first +classloader.resolve-order: parent-first + +# The amount of memory going to the network stack. These numbers usually need +# no tuning. Adjusting them may be necessary in case of an "Insufficient number +# of network buffers" error. The default min is 64MB, the default max is 1GB. +# +# taskmanager.memory.network.fraction: 0.1 +# taskmanager.memory.network.min: 64mb +# taskmanager.memory.network.max: 1gb + +#============================================================================== +# Flink Cluster Security Configuration +#============================================================================== + +# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - +# may be enabled in four steps: +# 1. configure the local krb5.conf file +# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) +# 3. make the credentials available to various JAAS login contexts +# 4. configure the connector to use JAAS/SASL + +# The below configure how Kerberos credentials are provided. A keytab will be used instead of +# a ticket cache if the keytab path and principal are set. + +# security.kerberos.login.use-ticket-cache: true +# security.kerberos.login.keytab: /path/to/kerberos/keytab +# security.kerberos.login.principal: flink-user + +# The configuration below defines which JAAS login contexts + +# security.kerberos.login.contexts: Client,KafkaClient + +#============================================================================== +# ZK Security Configuration +#============================================================================== + +# Below configurations are applicable if ZK ensemble is configured for security + +# Override below configuration to provide custom ZK service name if configured +# zookeeper.sasl.service-name: zookeeper + +# The configuration below must match one of the values set in "security.kerberos.login.contexts" +# zookeeper.sasl.login-context-name: Client + +#============================================================================== +# HistoryServer +#============================================================================== + +# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) + +# Directory to upload completed jobs to. Add this directory to the list of +# monitored directories of the HistoryServer as well (see below). +#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ + +# The address under which the web-based HistoryServer listens. +#historyserver.web.address: 0.0.0.0 + +# The port under which the web-based HistoryServer listens. +#historyserver.web.port: 8082 + +# Comma separated list of directories to monitor for completed jobs. +#historyserver.archive.fs.dir: hdfs:///completed-jobs/ + +# Interval in milliseconds for refreshing the monitored directories. +#historyserver.archive.fs.refresh-interval: 10000 + +jobmanager.rpc.address: jobmanager +env.java.opts: "-Dfile.encoding=UTF-8" diff --git a/e2e_test/docker-compose-env/mysql/docker-compose.yml b/e2e_test/docker-compose-env/mysql/docker-compose.yml index 7910cdb48e..e9f9ea1c8d 100644 --- a/e2e_test/docker-compose-env/mysql/docker-compose.yml +++ b/e2e_test/docker-compose-env/mysql/docker-compose.yml @@ -4,6 +4,7 @@ networks: external: true services: mysql: + container_name: mysql restart: always image: mysql:5.7 volumes: diff --git a/e2e_test/tools/config.py b/e2e_test/tools/config.py new file mode 100644 index 0000000000..c006cd4d9c --- /dev/null +++ b/e2e_test/tools/config.py @@ -0,0 +1,9 @@ +# standalone +standalone_address = "jobmanager:8282" + +# yarn +yarn_flink_lib="/flink/lib" +yarn_flink_conf="/flink/conf" +yarn_hadoop_conf="/flink/conf" +yarn_dinky_app_jar="/dinky/jar" + diff --git a/e2e_test/tools/docker-compose.yml b/e2e_test/tools/docker-compose.yml deleted file mode 100644 index 7eb73f8cd2..0000000000 --- a/e2e_test/tools/docker-compose.yml +++ /dev/null @@ -1,13 +0,0 @@ -version: "3" -networks: - dinky_net: - external: true -services: - python-script: - image: python:3.8 - volumes: - - ./:/app - working_dir: /app - command: 'pip install -r requirements.txt && python main.py dinky14:8888' - networks: - - dinky_net diff --git a/e2e_test/tools/env.py b/e2e_test/tools/env.py index 3cade9469c..d68e663a8a 100644 --- a/e2e_test/tools/env.py +++ b/e2e_test/tools/env.py @@ -1,10 +1,13 @@ from requests import Session import urllib.parse as urlparse +from hdfs.client import Client +import os +from config import * +from httpUtil import assertRespOk, url +from logger import log -from login import url, assertRespOk - -def addCluster(session: Session) -> int: +def addStandaloneCluster(session: Session) -> int: """ en: Add a cluster instance zh: 添加一个集群实例 @@ -15,7 +18,7 @@ def addCluster(session: Session) -> int: add_cluster_resp = session.put(url("api/cluster"), json={ "name": name, "type": "standalone", - "hosts": "jobmanager:8282" + "hosts": standalone_address }) assertRespOk(add_cluster_resp, "Add cluster") get_data_list = session.get(url(f"api/cluster/list?searchKeyWord={urlparse.quote(name)}&isAutoCreate=false")) @@ -24,3 +27,57 @@ def addCluster(session: Session) -> int: if data['name'] == name: return data['id'] raise Exception(f"Cluster {name} not found") + + +def addYarnCluster(session: Session) -> int: + client = Client("http://namenode:9870") + flink_lib_path = yarn_flink_lib + client.makedirs(flink_lib_path) + # Traverse the specified path and upload the file to HDFS + for root, dirs, files in os.walk(flink_lib_path): + for file in files: + filepath = os.path.join(root, file) + client.upload(flink_lib_path + "/" + file, filepath) + jar_path = yarn_dinky_app_jar + client.makedirs(jar_path) + for root, dirs, files in os.walk(jar_path): + for file in files: + if file.endswith(".jar") and file.__contains__("dinky-app"): + filepath = os.path.join(root, file) + jar_path = filepath + client.upload(jar_path, filepath) + name = "yarn-test" + params = { + "type": "yarn-application", + "name": name, + "enabled": True, + "config": { + "clusterConfig": { + "hadoopConfigPath": yarn_hadoop_conf, + "flinkLibPath": "hdfs://" + flink_lib_path, + "flinkConfigPath": yarn_flink_conf + }, + "flinkConfig": { + "configuration": { + "jobmanager.memory.process.size": "1024m", + "taskmanager.memory.process.size": "1024m", + "taskmanager.numberOfTaskSlots": "1", + "state.savepoints.dir": "hdfs:///flink/savepoint", + "state.checkpoints.dir": "hdfs:///flink/ckp" + } + }, + "appConfig": { + "userJarPath": "hdfs://" + jar_path + } + } + } + log.info(f"Adding yarn application cluster, parameters:{params}") + test_connection_yarn_resp = session.post(url("api/clusterConfiguration/testConnect"), json=params) + assertRespOk(test_connection_yarn_resp, "Test yarn connectivity") + test_connection_yarn_resp = session.put(url("api/clusterConfiguration/saveOrUpdate"), json=params) + assertRespOk(test_connection_yarn_resp, "Add Yarn Application Cluster") + get_app_list = session.get(url(f"api/clusterConfiguration/list?keyword={name}"), json=params) + assertRespOk(get_app_list, "Get Yarn Application Cluster") + for data in get_app_list.json()["data"]: + if data["name"] == name: + return data['id'] diff --git a/e2e_test/tools/httpUtil.py b/e2e_test/tools/httpUtil.py new file mode 100644 index 0000000000..d491a3a462 --- /dev/null +++ b/e2e_test/tools/httpUtil.py @@ -0,0 +1,23 @@ +import sys +from logger import log +from requests import Response +from json import JSONDecodeError + +dinky_addr = sys.argv[1] +log.info(f"The address of the current request:{dinky_addr}") + + +def url(path: str): + return rf"http://{dinky_addr}/{path}" + + +def assertRespOk(resp: Response, api_name: str): + if resp.status_code != 200: + raise AssertionError("api name:{api_name} request failed") + else: + try: + resp_json = resp.json() + if not resp_json["success"]: + raise AssertionError(f"api name:{api_name} request failed.Error: {resp_json['msg']}") + except JSONDecodeError: + raise AssertionError(f"api name:{api_name} request failed.Error: {resp.content.decode()}") diff --git a/e2e_test/tools/login.py b/e2e_test/tools/login.py index 041daff95c..676cf8c4c6 100644 --- a/e2e_test/tools/login.py +++ b/e2e_test/tools/login.py @@ -1,39 +1,10 @@ import sys -from json import JSONDecodeError import requests from requests import Response -from logger import log - -# # 创建一个logger对象 -# log = logging.getLogger(__name__) -# # 创建一个控制台处理器 -# console_handler = logging.StreamHandler() -# # 将控制台处理器添加到logger对象中 -# log.addHandler(console_handler) -# -# # 设置控制台处理器的输出格式 -# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -# console_handler.setFormatter(formatter) -dinky_addr = sys.argv[1] - -log.info(f"The address of the current request:{dinky_addr}") - -def url(path: str): - return rf"http://{dinky_addr}/{path}" - - -def assertRespOk(resp: Response, api_name: str): - if resp.status_code != 200: - raise AssertionError("api name:{api_name} request failed") - else: - try: - resp_json = resp.json() - if not resp_json["success"]: - raise AssertionError(f"api name:{api_name} request failed.Error: {resp_json['msg']}") - except JSONDecodeError as e: - raise AssertionError(f"api name:{api_name} request failed.Error: {resp.content.decode()}") +from httpUtil import assertRespOk, url +from logger import log def login(session: requests.Session): diff --git a/e2e_test/tools/main.py b/e2e_test/tools/main.py index 50a8ad1c37..5f18e656d7 100644 --- a/e2e_test/tools/main.py +++ b/e2e_test/tools/main.py @@ -1,14 +1,15 @@ import requests -from env import addCluster +from env import addStandaloneCluster, addYarnCluster from login import login -from task import addCatalogue, runFlinkLocalTask, runFlinkSessionTask +from task import addCatalogue, Task if __name__ == '__main__': session = requests.session() login(session) - clusterId = addCluster(session) + clusterId = addStandaloneCluster(session) + yarn_cluster_id = addYarnCluster(session) catalogue = addCatalogue(session, "flink-sql-task") sql = "DROP TABLE IF EXISTS source_table3;\r\nCREATE TABLE IF NOT EXISTS source_table3(\r\n--订单id\r\n`order_id` BIGINT,\r\n--产品\r\n\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n\r\n--支付时间\r\n`order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), -- `在这里插入代码片`\r\n--WATERMARK\r\nWATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND\r\n) WITH(\r\n'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_id.min' = '1',\r\n 'fields.order_id.max' = '2',\r\n 'fields.amount.min' = '1',\r\n 'fields.amount.max' = '10',\r\n 'fields.product.min' = '1',\r\n 'fields.product.max' = '2'\r\n);\r\n\r\n-- SELECT * FROM source_table3 LIMIT 10;\r\n\r\nDROP TABLE IF EXISTS sink_table5;\r\nCREATE TABLE IF NOT EXISTS sink_table5(\r\n--产品\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n--支付时间\r\n`order_time` TIMESTAMP(3),\r\n--1分钟时间聚合总数\r\n`one_minute_sum` BIGINT\r\n) WITH(\r\n'connector'='print'\r\n);\r\n\r\nINSERT INTO sink_table5\r\nSELECT\r\nproduct,\r\namount,\r\norder_time,\r\nSUM(amount) OVER(\r\nPARTITION BY product\r\nORDER BY order_time\r\n-- 标识统计范围是1个 product 的最近 1 分钟的数据\r\nRANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW\r\n) as one_minute_sum\r\nFROM source_table3;" - runFlinkLocalTask(session, catalogue.id, "flink-sql-datagen-test",sql ) - runFlinkSessionTask(session, catalogue.id, clusterId, "flink-sql-datagen-test-session", sql) + flink_sql_datagen_test = Task(session, clusterId, yarn_cluster_id, catalogue.id, "flink-sql-datagen-test", sql) + flink_sql_datagen_test.runFlinkTask(wait_time=10, is_async=True) diff --git a/e2e_test/tools/requirements.txt b/e2e_test/tools/requirements.txt index fb1f1e2c34..f5fc75eec5 100644 Binary files a/e2e_test/tools/requirements.txt and b/e2e_test/tools/requirements.txt differ diff --git a/e2e_test/tools/task.py b/e2e_test/tools/task.py index a7c781ba03..07b52db7b3 100644 --- a/e2e_test/tools/task.py +++ b/e2e_test/tools/task.py @@ -1,23 +1,140 @@ from time import sleep - +from enum import Enum import requests +import concurrent.futures from login import assertRespOk, url from logger import log class CatalogueTree: - def __init__(self, id: int, name: str, taskId: int, children): + def __init__(self, id: int, name: str, task_id: int, children): self.id = id self.name = name - self.taskId = taskId + self.task_id = task_id self.children: list[CatalogueTree] = children +class FlinkRunMode(Enum): + LOCAL = "local" + STANDALONE = "standalone" + YARN_APPLICATION = "yarn-application" + + @staticmethod + def getAllMode(): + return [FlinkRunMode.LOCAL, FlinkRunMode.STANDALONE, FlinkRunMode.YARN_APPLICATION] + + +class Task: + def __init__(self, session: requests.Session, cluster_id: int, yarn_cluster_id: int, parent_id: int, name: str, + statement): + self.session = session + self.cluster_id = cluster_id + self.yarn_cluster_id = yarn_cluster_id + self.parent_id = parent_id + self.name = name + self.statement = statement + + def addTask(self, name: str, parent_id: int = 0, dialect: str = "FlinkSql", + statement: str = "", run_model: FlinkRunMode = FlinkRunMode.LOCAL) -> CatalogueTree: + """ + en: Add a task + zh: 添加一个任务 + :param name: task name + :param parent_id: dir id + :param statement: statement + :return CatalogueTree + """ + model_str = run_model.value + session = self.session + params = { + "name": name, + "type": dialect, + "firstLevelOwner": 1, + "task": { + "savePointStrategy": 0, + "parallelism": 1, + "envId": -1, + "step": 1, + "alertGroupId": -1, + "type": model_str, + "dialect": dialect, + "statement": statement, + "firstLevelOwner": 1, + }, + "isLeaf": False, + "parentId": parent_id + } + if run_model == FlinkRunMode.STANDALONE: + params["task"]["clusterId"] = self.cluster_id + elif run_model == FlinkRunMode.YARN_APPLICATION: + params["task"]["clusterConfigurationId"] = self.yarn_cluster_id + add_parent_dir_resp = session.put(url("api/catalogue/saveOrUpdateCatalogueAndTask"), json=params) + assertRespOk(add_parent_dir_resp, "Create a task") + get_all_tasks_resp = session.post(url("api/catalogue/getCatalogueTreeData"), json={ + "sortValue": "", + "sortType": "" + }) + assertRespOk(get_all_tasks_resp, "Get job details") + data_list: list[dict] = get_all_tasks_resp.json()['data'] + return getTask(data_list, name) + + def getFlinkTaskStatus(self, jobInstanceId: int) -> str: + """ + en: Obtain the status of a Flink task + zh: 获取Flink 任务状态 + :param jobInstanceId: job instance id + :return: status + """ + run_task_resp = self.session.get(url(f"api/jobInstance/refreshJobInfoDetail?id={jobInstanceId}&isForce=false")) + assertRespOk(run_task_resp, "Get Task Status") + return run_task_resp.json()['data']['instance']['status'] + + def runTask(self, taskId: int) -> int: + """ + en:Run a task + zh:运行一个任务 + :param taskId: task id + :return: + """ + + run_task_resp = self.session.get(url(f"api/task/submitTask?id={taskId}")) + assertRespOk(run_task_resp, "Run Task") + return run_task_resp.json()['data']['jobInstanceId'] + + def runFlinkTask(self, modes: list[FlinkRunMode] = FlinkRunMode.getAllMode(), wait_time: int = 10, + is_async: bool = False): + name = self.name + statement = self.statement + parent_id = self.parent_id + log.info( + f"======================\nA Flink task is currently executed,name: {name}, statement: \n{statement}\n ======================") + + def taskFunc(mode: FlinkRunMode): + flink_task_name = name + "-" + mode.name + task = self.addTask(flink_task_name, parent_id, "FlinkSql", statement, mode) + job_instance_id = self.runTask(task.task_id) + sleep(wait_time) + log.info(f"正在检查:{flink_task_name}任务状态") + status = self.getFlinkTaskStatus(job_instance_id) + assertFlinkTaskIsRunning(status, flink_task_name) + + if is_async: + with concurrent.futures.ThreadPoolExecutor() as executor: + results = [executor.submit(taskFunc, model ) for model in modes] + for result in results: + result.result() + else: + for mode in modes: + taskFunc(mode) + + def assertFlinkTaskIsRunning(status: str, name: str): # todo 这里应该判断flink是否有抛出异常,而不是只有状态 if status != "RUNNING": - raise Exception(f"Flink name:{name} is not RUNNING,current status:{status}") + error = f"Flink name:{name} is not RUNNING,current status:{status}" + log.error(error) + raise Exception(error) def getTask(data_list: list[dict], name: str) -> CatalogueTree: @@ -44,109 +161,3 @@ def addCatalogue(session: requests.Session, name: str, isLeaf: bool = False, par assertRespOk(get_all_tasks_resp, "Get job details") data_list: list[dict] = get_all_tasks_resp.json()['data'] return getTask(data_list, name) - - -def addTask(session: requests.Session, name: str, parent_id: int = 0, dialect: str = "FlinkSql", - statement: str = "", runtModel: str = "local", clusterId: int = -1) -> CatalogueTree: - """ - en: Add a task - zh: 添加一个任务 - :param session: requests.Session - :param name: task name - :param parent_id: dir id - :param type: task type - :param statement: statement - :return CatalogueTree - """ - add_parent_dir_resp = session.put(url("api/catalogue/saveOrUpdateCatalogueAndTask"), json={ - "name": name, - "type": dialect, - "firstLevelOwner": 1, - "task": { - "savePointStrategy": 0, - "parallelism": 1, - "envId": -1, - "step": 1, - "alertGroupId": -1, - "type": runtModel, - "dialect": dialect, - "statement": statement, - "firstLevelOwner": 1, - "clusterId":clusterId - }, - "isLeaf": False, - "parentId": parent_id - }) - assertRespOk(add_parent_dir_resp, "Create a task") - get_all_tasks_resp = session.post(url("api/catalogue/getCatalogueTreeData"), json={ - "sortValue": "", - "sortType": "" - }) - assertRespOk(get_all_tasks_resp, "Get job details") - data_list: list[dict] = get_all_tasks_resp.json()['data'] - return getTask(data_list, name) - - -def runTask(session: requests.Session, taskId: int) -> int: - """ - en:Run a task - zh:运行一个任务 - :param session: requests.Session - :param taskId: task id - :return: - """ - run_task_resp = session.get(url(f"api/task/submitTask?id={taskId}")) - assertRespOk(run_task_resp, "Run Task") - return run_task_resp.json()['data']['jobInstanceId'] - - -def getFlinkTaskStatus(session: requests.Session, jobInstanceId: int) -> str: - """ - en: Obtain the status of a Flink task - zh: 获取Flink 任务状态 - :param session: requests.Session - :param jobInstanceId: job instance id - :return: status - """ - run_task_resp = session.get(url(f"api/jobInstance/refreshJobInfoDetail?id={jobInstanceId}&isForce=false")) - assertRespOk(run_task_resp, "Get Task Status") - return run_task_resp.json()['data']['instance']['status'] - - -def runFlinkLocalTask(session: requests.Session, parentId: int, name: str, statement: str, waitTime: int = 10) -> None: - """ - en: Run a FlinkLocal task - zh: 运行一个 FlinkLocal任务 - :param session: requests.Session - :param parentId: dir id - :param name: task name - :param statement: statement - :param waitTime: zh:等待时间 - """ - log.info( - f"======================\nA Local Flink task is currently executed,name: {name}, statement: \n{statement}\n ======================") - task = addTask(session, name, parentId, "FlinkSql", statement) - jobInstanceId = runTask(session, task.taskId) - sleep(waitTime) - status = getFlinkTaskStatus(session, jobInstanceId) - assertFlinkTaskIsRunning(status, name) - - -def runFlinkSessionTask(session: requests.Session, parentId: int,clusterId:int, name: str, statement: str, - waitTime: int = 10) -> None: - """ - en: Run a FlinkLocal task - zh: 运行一个 FlinkLocal任务 - :param session: requests.Session - :param parentId: dir id - :param name: task name - :param statement: statement - :param waitTime: zh:等待时间 - """ - log.info( - f"======================\nA Session Flink task is currently executed,name: {name}, statement: \n{statement}\n ======================") - task = addTask(session, name, parentId, "FlinkSql", statement,"standalone",clusterId) - jobInstanceId = runTask(session, task.taskId) - sleep(waitTime) - status = getFlinkTaskStatus(session, jobInstanceId) - assertFlinkTaskIsRunning(status, name)