diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c69c59d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,12 @@ +blinker==1.8.2 +certifi==2024.7.4 +charset-normalizer==3.3.2 +click==8.1.7 +Flask==3.0.3 +idna==3.7 +itsdangerous==2.2.0 +Jinja2==3.1.4 +MarkupSafe==2.1.5 +requests==2.32.3 +urllib3==2.2.2 +Werkzeug==3.0.3 diff --git a/src/DugLakefsEventListeners/__init__.py b/src/DugLakefsEventListeners/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/DugLakefsEventListeners/actions/__init__.py b/src/DugLakefsEventListeners/actions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/DugLakefsEventListeners/data/invoke_airflow_with_diff_config.json b/src/DugLakefsEventListeners/data/invoke_airflow_with_diff_config.json new file mode 100644 index 0000000..e9ec9bd --- /dev/null +++ b/src/DugLakefsEventListeners/data/invoke_airflow_with_diff_config.json @@ -0,0 +1,10 @@ +{ + "dags": [ + { + "dag_id": "example_bash_operator", + "repository_id": "repo1", + "branch_name": "branch1", + "last_commit_id": "cad1136eb748603c3379cda715d23422126af9ae6ebd541a52e28c61011bd57c" + } + ] +} \ No newline at end of file diff --git a/src/DugLakefsEventListeners/main.py b/src/DugLakefsEventListeners/main.py new file mode 100644 index 0000000..a016910 --- /dev/null +++ b/src/DugLakefsEventListeners/main.py @@ -0,0 +1,70 @@ +import json +import logging +import os +import requests + +from flask import Flask, request, jsonify + +INVOKE_AIRFLOW_WITH_DIFF = "invoke_airflow_with_diff" +AIRFLOW_URL = "http://airflow:airflow@127.0.0.1:8080/api/v1/" + +app = Flask(__name__) + + +def invoke_airflow_dag(dag_id, repository_id, branch_name, last_commit_id, new_commit_id) -> bool: + params = \ + { + 'conf': + { + 'repository_id': repository_id, + 'branch_name': branch_name, + 'commitid_from' : last_commit_id, + 'commitid_to' : new_commit_id + } + } + resp = requests.post(AIRFLOW_URL + f"dags/{dag_id}/dagRuns", json=params) + logging.info(resp.json()) + if resp.status_code == 200: + return True + else: + return False + + +@app.route('/' + INVOKE_AIRFLOW_WITH_DIFF, methods=["POST"]) +def invoke_airflow_with_diff(): + input_data = request.json + logging.info(INVOKE_AIRFLOW_WITH_DIFF) + logging.info(input_data) + + config = {} + config_json = f"./data/{INVOKE_AIRFLOW_WITH_DIFF}_config.json" + + if not os.path.isfile(config_json): + logging.error(f"{config_json} not found!") + output_data = {'status': 'error'} + return jsonify(output_data) + + with open(config_json, "r") as f: + config = json.load(f) + + repository_id = input_data["repository_id"] + source_ref = input_data["source_ref"] + + dags = config["dags"] + + for dag in dags: + if dag["repository_id"] == repository_id: + res = invoke_airflow_dag(dag["dag_id"], repository_id, dag["branch_name"], dag["last_commit_id"], source_ref) + if res: + dag["last_commit_id"] = source_ref + + with open(config_json, "w") as f: + json.dump(config, f, indent=4) + + output_data = {'status': 'OK'} + return jsonify(output_data) + +if __name__ == '__main__': + AIRFLOW_URL = os.environ.get("AIRFLOW_URL", AIRFLOW_URL) + app.debug = True + app.run(host='0.0.0.0', port=8001) \ No newline at end of file diff --git a/test/post_invoke_airflow_with_diff.http b/test/post_invoke_airflow_with_diff.http new file mode 100644 index 0000000..58b43fe --- /dev/null +++ b/test/post_invoke_airflow_with_diff.http @@ -0,0 +1,14 @@ +### Send POST request with json body +POST http://127.0.0.1:8001/invoke_airflow_with_diff +Content-Type: application/json + +{ + "event_type" : "post-create-tag", + "event_time" : "2024-08-08T00:09:53Z", + "action_name": "SendDiffToAfDag", + "hook_id": "no_temp", + "repository_id": "repo1", + "source_ref" : "cad1136eb748603c3379cda715d23422126af9ae6ebd541a52e28c61011bd57c", + "tag_id" : "erer34", + "commit_id" : "cad1136eb748603c3379cda715d23422126af9ae6ebd541a52e28c61011bd57c" +} \ No newline at end of file