Skip to content

Commit

Permalink
Added trigger invoke_airflow_with_diff
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir2217 committed Aug 19, 2024
1 parent dc92991 commit 59c0631
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 0 deletions.
12 changes: 12 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
blinker==1.8.2
certifi==2024.7.4
charset-normalizer==3.3.2
click==8.1.7
Flask==3.0.3
idna==3.7
itsdangerous==2.2.0
Jinja2==3.1.4
MarkupSafe==2.1.5
requests==2.32.3
urllib3==2.2.2
Werkzeug==3.0.3
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"dags": [
{
"dag_id": "example_bash_operator",
"repository_id": "repo1",
"branch_name": "branch1",
"last_commit_id": "cad1136eb748603c3379cda715d23422126af9ae6ebd541a52e28c61011bd57c"
}
]
}
70 changes: 70 additions & 0 deletions src/DugLakefsEventListeners/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import json
import logging
import os
import requests

from flask import Flask, request, jsonify

INVOKE_AIRFLOW_WITH_DIFF = "invoke_airflow_with_diff"
AIRFLOW_URL = "http://airflow:[email protected]:8080/api/v1/"

app = Flask(__name__)


def invoke_airflow_dag(dag_id, repository_id, branch_name, last_commit_id, new_commit_id) -> bool:
params = \
{
'conf':
{
'repository_id': repository_id,
'branch_name': branch_name,
'commitid_from' : last_commit_id,
'commitid_to' : new_commit_id
}
}
resp = requests.post(AIRFLOW_URL + f"dags/{dag_id}/dagRuns", json=params)
logging.info(resp.json())
if resp.status_code == 200:
return True
else:
return False


@app.route('/' + INVOKE_AIRFLOW_WITH_DIFF, methods=["POST"])
def invoke_airflow_with_diff():
input_data = request.json
logging.info(INVOKE_AIRFLOW_WITH_DIFF)
logging.info(input_data)

config = {}
config_json = f"./data/{INVOKE_AIRFLOW_WITH_DIFF}_config.json"

if not os.path.isfile(config_json):
logging.error(f"{config_json} not found!")
output_data = {'status': 'error'}
return jsonify(output_data)

with open(config_json, "r") as f:
config = json.load(f)

repository_id = input_data["repository_id"]
source_ref = input_data["source_ref"]

dags = config["dags"]

for dag in dags:
if dag["repository_id"] == repository_id:
res = invoke_airflow_dag(dag["dag_id"], repository_id, dag["branch_name"], dag["last_commit_id"], source_ref)
if res:
dag["last_commit_id"] = source_ref

with open(config_json, "w") as f:
json.dump(config, f, indent=4)

output_data = {'status': 'OK'}
return jsonify(output_data)

if __name__ == '__main__':
AIRFLOW_URL = os.environ.get("AIRFLOW_URL", AIRFLOW_URL)
app.debug = True
app.run(host='0.0.0.0', port=8001)
14 changes: 14 additions & 0 deletions test/post_invoke_airflow_with_diff.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
### Send POST request with json body
POST http://127.0.0.1:8001/invoke_airflow_with_diff
Content-Type: application/json

{
"event_type" : "post-create-tag",
"event_time" : "2024-08-08T00:09:53Z",
"action_name": "SendDiffToAfDag",
"hook_id": "no_temp",
"repository_id": "repo1",
"source_ref" : "cad1136eb748603c3379cda715d23422126af9ae6ebd541a52e28c61011bd57c",
"tag_id" : "erer34",
"commit_id" : "cad1136eb748603c3379cda715d23422126af9ae6ebd541a52e28c61011bd57c"
}

0 comments on commit 59c0631

Please sign in to comment.