forked from RealImpactAnalytics/airflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 1047940
Showing
52 changed files
with
5,023 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
env | ||
logs | ||
build | ||
*.pyc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
Flux | ||
==== | ||
Flux is a system to programmaticaly author, schedule and monitor data pipelines. | ||
|
||
Datanauts write Python that define workflows as directed acyclic graphs (DAGs) of data related tasks. They can run tasks and workflow for different timeframe interactively, and commiting their pipelines into production is all it takes for the master scheduler to run the pipelines with the schedule and dependencies specified. | ||
|
||
The Flux UI make it easy to visualize the pipelines running in production, their hierachy, progress and exposes way to pinpoint and troubleshoot issues when needed. | ||
|
||
### Principles | ||
* **Interactivity:** the libraries are intuitive so that writting / testing and monitoring pipelines from a shell (or an iPython Notebook) should just flow | ||
* **Dynamic:** Flux has intrinsec support for dynamic pipeline generation: you can write pipelines, as well as writing code that defines pipeline. | ||
* **Extensible:** easily define your own operators, executors and extend the library so that it fits the level of abstraction that suits your environment | ||
* **Elegant:** make your commands dynamic with the power of the **Jinja** templating engine | ||
* **Distributed:** Distributed scheduling, workers trigger downstream jobs, a job supervisor triggers tasks instances, an hypervisor monitors jobs and restart them when no heartbeats are detected. | ||
|
||
### Concepts | ||
##### Operators | ||
An operator allows to generate a certain type of task on the graph. There are 3 main type of operators: | ||
|
||
* **Sensor:** Waits for events to happen, it could be a file appearing in HDFS, the existance of a Hive partition or for an arbitrary MySQL query to return a row. | ||
* **Remote Execution:** Trigger an operation in a remote system, this could be a HQL statement in Hive, a Pig script, a map reduce job, a stored procedure in Oracle or a Bash script to run. | ||
* **Data transfers:** Move data from a system to another. Push data from Hive to MySQL, from a local file to HDFS, from Postgres to Oracle, or anything of that nature. | ||
##### Tasks | ||
A task represent the instantiation of an operator and becomes a node in the directed acyclic graph (DAG). The instantiation defines specific values when calling the abstract operator. A task would be waiting for a specific partition in Hive, or triggerring a specific DML statement in Oracle. | ||
##### Task instances | ||
A task instance represents a task run, for a specific point in time. While the task defines a start datetime and a schedule (say every hour or every day), a task instance represents a specific run of a task. A task instance will have a status of either "started", "retrying", "failed" or "success" | ||
|
||
Installation | ||
------------ | ||
##### Debian packages | ||
sudo apt-get install virtualenv python-dev | ||
sudo apt-get install libmysqlclient-dev mysql-server | ||
sudo apt-get g++ | ||
##### Create a python virtualenv | ||
virtualenv env # creates the environment | ||
source init.sh # activates the environment | ||
##### Use pip to install the python packages required by Flux | ||
pip install -r requirements.txt | ||
##### Setup the metdata database | ||
Here are steps to get started using MySQL as a backend for the metadata database, though any backend supported by SqlAlquemy should work just fine. | ||
|
||
$ mysql -u root -p | ||
mysql> CREATE DATABASE flux; | ||
CREATE USER 'flux'@'localhost' IDENTIFIED BY 'flux'; | ||
GRANT ALL PRIVILEGES ON flux.* TO 'flux'@'localhost'; | ||
|
||
TODO | ||
----- | ||
#### UI | ||
* Tree view: remove dummy root node | ||
* Graph view add tooltip | ||
* Backfill wizard | ||
* Fix datepicker | ||
|
||
#### Command line | ||
* Add support for including upstream and downstream | ||
#### Write unittests | ||
* For each existing operator | ||
#### More Operators! | ||
* HIVE | ||
* BaseDataTransferOperator | ||
* File2MySqlOperator | ||
* PythonOperator | ||
* DagTaskSensor for cross dag dependencies | ||
* PIG | ||
#### Macros | ||
* Hive latest partition | ||
* Previous execution timestamp | ||
* ... | ||
#### Backend | ||
* LocalExecutor, ctrl-c should result in error state instead of forever running | ||
* CeleryExecutor | ||
* Clear should kill running jobs | ||
#### Misc | ||
* BaseJob | ||
* DagBackfillJob | ||
* TaskIntanceJob | ||
* ClearJob? | ||
* Write an hypervisor, looks for dead jobs without a heartbeat and kills | ||
* Authentication with Flask-Login and Flask-Principal | ||
* email_on_retry | ||
|
||
#### Testing required | ||
* result queue with remove descendants | ||
|
||
#### Wishlist | ||
* Jobs can send pickles of local version to remote executors? | ||
* Support for cron like synthax (0 * * * ) using croniter library |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from models import DAG |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
#!python | ||
|
||
import dateutil.parser | ||
import logging | ||
import os | ||
import signal | ||
import sys | ||
from time import sleep | ||
|
||
|
||
BASE_FOLDER = os.path.abspath(__file__ + "/../../../") | ||
if BASE_FOLDER not in sys.path: | ||
sys.path.append(BASE_FOLDER) | ||
import argparse | ||
from core import settings | ||
from core.models import DagBag | ||
from core.models import TaskInstance | ||
from core.models import State | ||
from datetime import datetime | ||
from sqlalchemy import func | ||
|
||
# Common help text across subcommands | ||
mark_success_help = "Mark jobs as succeeded without running them" | ||
subdir_help = "File location or directory from which to look for the dag" | ||
|
||
|
||
def backfill(args): | ||
logging.basicConfig(level=logging.INFO) | ||
dagbag = DagBag(args.subdir) | ||
if args.dag_id not in dagbag.dags: | ||
raise Exception('dag_id could not be found') | ||
dag = dagbag.dags[args.dag_id] | ||
|
||
if args.start_date: | ||
args.start_date = datetime.strptime(args.start_date, '%Y-%m-%d') | ||
if args.end_date: | ||
args.end_date = datetime.strptime(args.end_date, '%Y-%m-%d') | ||
|
||
if args.task_regex: | ||
dag = dag.sub_dag(task_regex=args.task_regex) | ||
|
||
dag.run( | ||
start_date=args.start_date, | ||
end_date=args.end_date, | ||
mark_success=args.mark_success) | ||
|
||
|
||
def run(args): | ||
args.execution_date = dateutil.parser.parse(args.execution_date) | ||
iso = args.execution_date.isoformat() | ||
directory = settings.BASE_LOG_FOLDER + \ | ||
"/{args.dag_id}/{args.task_id}".format(args=args) | ||
if not os.path.exists(directory): | ||
os.makedirs(directory) | ||
filename = "{directory}/{iso}".format(**locals()) | ||
logging.basicConfig( | ||
filename=filename, level=logging.INFO, format=settings.LOG_FORMAT) | ||
|
||
print("Logging into: " + filename) | ||
dagbag = DagBag(args.subdir) | ||
if args.dag_id not in dagbag.dags: | ||
raise Exception('dag_id could not be found') | ||
dag = dagbag.dags[args.dag_id] | ||
task = dag.get_task(task_id=args.task_id) | ||
|
||
# This is enough to fail the task instance | ||
def signal_handler(signum, frame): | ||
logging.error("SIGINT (ctrl-c) received".format(args.task_id)) | ||
|
||
signal.signal(signal.SIGINT, signal_handler) | ||
|
||
task.run( | ||
start_date=args.execution_date, | ||
end_date=args.execution_date, | ||
mark_success=args.mark_success, | ||
force=args.force, | ||
ignore_dependencies=args.ignore_dependencies) | ||
|
||
|
||
def clear(args): | ||
logging.basicConfig(level=logging.INFO) | ||
dagbag = DagBag(args.subdir) | ||
if args.dag_id not in dagbag.dags: | ||
raise Exception('dag_id could not be found') | ||
dag = dagbag.dags[args.dag_id] | ||
|
||
if args.start_date: | ||
args.start_date = datetime.strptime(args.start_date, '%Y-%m-%d') | ||
if args.end_date: | ||
args.end_date = datetime.strptime(args.end_date, '%Y-%m-%d') | ||
|
||
if args.task_regex: | ||
dag = dag.sub_dag(task_regex=args.task_regex) | ||
dag.clear( | ||
start_date=args.start_date, end_date=args.end_date) | ||
|
||
|
||
def webserver(args): | ||
logging.basicConfig(level=logging.DEBUG, format=settings.LOG_FORMAT) | ||
print(settings.HEADER) | ||
from www.app import app | ||
print("Starting the web server on port {0}.".format(args.port)) | ||
app.run(debug=True, port=args.port) | ||
|
||
|
||
def master(args): | ||
|
||
logging.basicConfig(level=logging.DEBUG) | ||
logging.info("Starting a master scheduler") | ||
|
||
session = settings.Session() | ||
TI = TaskInstance | ||
# This should get new code | ||
dagbag = DagBag(args.subdir) | ||
executor = dagbag.executor() | ||
executor.start() | ||
if args.dag_id: | ||
dags = [dagbag.dags[args.dag_id]] | ||
else: | ||
dags = dagbag.dags.values() | ||
while True: | ||
for dag in dags: | ||
logging.info( | ||
"Getting latest instance for all task in dag " + dag.dag_id) | ||
sq = session.query( | ||
TI.task_id, | ||
func.max(TI.execution_date).label('max_ti') | ||
).filter( | ||
TI.dag_id == dag.dag_id).group_by(TI.task_id).subquery('sq') | ||
qry = session.query(TI).filter( | ||
TI.dag_id == dag.dag_id, | ||
TI.task_id == sq.c.task_id, | ||
TI.execution_date == sq.c.max_ti, | ||
) | ||
latest_ti = qry.all() | ||
ti_dict = {ti.task_id: ti for ti in latest_ti} | ||
session.commit() | ||
|
||
for task in dag.tasks: | ||
if task.task_id not in ti_dict: | ||
# Brand new task, let's get started | ||
ti = TI(task, task.start_date) | ||
executor.queue_command(ti.key, ti.command) | ||
else: | ||
ti = ti_dict[task.task_id] | ||
ti.task = task # Hacky but worky | ||
if ti.state == State.UP_FOR_RETRY: | ||
# If task instance if up for retry, make sure | ||
# the retry delay is met | ||
if ti.is_runnable(): | ||
executor.queue_command(ti.key, ti.command) | ||
elif ti.state == State.RUNNING: | ||
# Only one task at a time | ||
continue | ||
else: | ||
# Trying to run the next schedule | ||
ti = TI( | ||
task=task, | ||
execution_date=ti.execution_date + | ||
task.schedule_interval | ||
) | ||
ti.refresh_from_db() | ||
if ti.is_runnable(): | ||
executor.queue_command(ti.key, ti.command) | ||
session.close() | ||
sleep(5) | ||
executor.end() | ||
|
||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser() | ||
subparsers = parser.add_subparsers(help='sub-command help') | ||
|
||
ht = "Run subsections of a DAG for a specified date range" | ||
parser_backfill = subparsers.add_parser('backfill', help=ht) | ||
parser_backfill.add_argument("dag_id", help="The id of the dag to run") | ||
parser_backfill.add_argument( | ||
"-t", "--task_regex", | ||
help="The regex to filter specific task_ids to backfill (optional)") | ||
parser_backfill.add_argument( | ||
"-s", "--start_date", help="Overide start_date YYYY-MM-DD") | ||
parser_backfill.add_argument( | ||
"-e", "--end_date", help="Overide end_date YYYY-MM-DD") | ||
parser_backfill.add_argument( | ||
"-m", "--mark_success", | ||
help=mark_success_help, action="store_true") | ||
parser_backfill.add_argument( | ||
"-sd", "--subdir", help=subdir_help, | ||
default=settings.DAGS_FOLDER) | ||
parser_backfill.set_defaults(func=backfill) | ||
|
||
ht = "Clear a set of task instance, as if they never ran" | ||
parser_clear = subparsers.add_parser('clear', help=ht) | ||
parser_clear.add_argument("dag_id", help="The id of the dag to run") | ||
parser_clear.add_argument( | ||
"-t", "--task_regex", | ||
help="The regex to filter specific task_ids to clear (optional)") | ||
parser_clear.add_argument( | ||
"-s", "--start_date", help="Overide start_date YYYY-MM-DD") | ||
parser_clear.add_argument( | ||
"-e", "--end_date", help="Overide end_date YYYY-MM-DD") | ||
parser_clear.add_argument( | ||
"-sd", "--subdir", help=subdir_help, | ||
default=settings.DAGS_FOLDER) | ||
parser_clear.set_defaults(func=clear) | ||
|
||
ht = "Run a single task instance" | ||
parser_run = subparsers.add_parser('run', help=ht) | ||
parser_run.add_argument("dag_id", help="The id of the dag to run") | ||
parser_run.add_argument("task_id", help="The task_id to run") | ||
parser_run.add_argument( | ||
"execution_date", help="The execution date to run") | ||
parser_run.add_argument( | ||
"-sd", "--subdir", help=subdir_help, default=settings.DAGS_FOLDER) | ||
parser_run.add_argument( | ||
"-m", "--mark_success", help=mark_success_help, action="store_true") | ||
parser_run.set_defaults(func=run) | ||
ht = "Force a run regardless or previous success" | ||
parser_run.add_argument( | ||
"-f", "--force", help=ht, action="store_true") | ||
ht = "Ignore upstream and depends_on_past dependencies" | ||
parser_run.add_argument( | ||
"-i", "--ignore_dependencies", help=ht, action="store_true") | ||
|
||
ht = "Start a Flux webserver instance" | ||
parser_webserver = subparsers.add_parser('webserver', help=ht) | ||
parser_webserver.add_argument( | ||
"-p", "--port", | ||
default=8080, | ||
type=int, | ||
help="Set the port on which to run the web server") | ||
parser_webserver.set_defaults(func=webserver) | ||
|
||
ht = "Start a master scheduler instance" | ||
parser_master = subparsers.add_parser('master', help=ht) | ||
parser_master.add_argument( | ||
"-d", "--dag_id", help="The id of the dag to run") | ||
parser_master.add_argument( | ||
"-sd", "--subdir", help=subdir_help, default=settings.DAGS_FOLDER) | ||
parser_master.set_defaults(func=master) | ||
|
||
args = parser.parse_args() | ||
args.func(args) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from base_executor import LocalExecutor | ||
from base_executor import SequentialExecutor | ||
DEFAULT_EXECUTOR = LocalExecutor |
Oops, something went wrong.