Skip to content

Commit

Permalink
Renaming project from Flux to Airflow
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Oct 15, 2014
1 parent 323571e commit 88c9962
Show file tree
Hide file tree
Showing 58 changed files with 164 additions and 165 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Flux
Airflow
====
Flux is a system to programmaticaly author, schedule and monitor data pipelines.
Airflow is a system to programmaticaly author, schedule and monitor data pipelines.

[Documentation](https://readthedocs.org/dashboard/flux-data-pipelines/)
File renamed without changes.
11 changes: 5 additions & 6 deletions flux/bin/flux → airflow/bin/airflow
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#!/usr/bin/env python

from flux import settings
from flux.models import DagBag, TaskInstance, DagPickle, State
from sqlalchemy.ext.declarative import declarative_base
from airflow import settings
from airflow.models import DagBag, TaskInstance, DagPickle, State

import dateutil.parser
from datetime import datetime
Expand Down Expand Up @@ -185,7 +184,7 @@ def initdb(args):
"Proceed? (y/n)").upper() == "Y":
logging.basicConfig(level=logging.DEBUG)

from flux import models
from airflow import models

logging.info("Dropping tables that exist")
models.Base.metadata.drop_all(settings.engine)
Expand All @@ -198,8 +197,8 @@ def initdb(args):
session.query(models.DatabaseConnection).delete()
mysqldb = models.DatabaseConnection(
db_id='local_mysql', db_type='mysql',
host='localhost', login='flux', password='flux',
schema='flux')
host='localhost', login='airflow', password='airflow',
schema='airflow')
session.add(mysqldb)
session.commit()

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
import time

from flux import settings
from flux.utils import State
from airflow import settings
from airflow.utils import State


class BaseExecutor(object):
Expand Down Expand Up @@ -74,7 +74,7 @@ def run(self):
print command
command = (
"exec bash -c '"
"cd $FLUX_HOME;\n" +
"cd $AIRFLOW_HOME;\n" +
"source init.sh;\n" +
command +
"'"
Expand All @@ -91,7 +91,7 @@ def run(self):

class LocalExecutor(BaseExecutor):

def __init__(self, parallelism=16):
def __init__(self, parallelism=8):
super(LocalExecutor, self).__init__()
self.parallelism = parallelism

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys
import os
from flux import settings
from airflow import settings

# Adding the Hive python libs to python path
sys.path.insert(0, settings.HIVE_HOME_PY)
Expand All @@ -13,7 +13,7 @@
from hive_metastore import ThriftHiveMetastore
from hive_service import ThriftHive

from flux.hooks.base_hook import BaseHook
from airflow.hooks.base_hook import BaseHook

METASTORE_THRIFT_HOST = "localhost"
METASTORE_THRIFT_PORT = 10000
Expand Down
4 changes: 2 additions & 2 deletions flux/hooks/mysql_hook.py → airflow/hooks/mysql_hook.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import MySQLdb
from flux import settings
from flux.models import DatabaseConnection
from airflow import settings
from airflow.models import DatabaseConnection


class MySqlHook(object):
Expand Down
File renamed without changes.
12 changes: 6 additions & 6 deletions flux/models.py → airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from flux import macros
from flux.executors import DEFAULT_EXECUTOR
from flux import settings
from flux import utils
from airflow import macros
from airflow.executors import DEFAULT_EXECUTOR
from airflow import settings
from airflow import utils
from settings import ID_LEN
import socket
from utils import State
Expand Down Expand Up @@ -190,7 +190,7 @@ def command(
force=False,
pickle=None):
"""
Returns a command that can be executed anywhere where flux is
Returns a command that can be executed anywhere where airflow is
installed. This command is part of the message sent to executors by
the orchestrator.
"""
Expand All @@ -203,7 +203,7 @@ def command(
if not pickle and self.task.dag and self.task.dag.filepath:
subdir = "-sd {0}".format(self.task.dag.filepath)
return (
"flux run "
"airflow run "
"{self.dag_id} {self.task_id} {iso} "
"{mark_success} "
"{pickle} "
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import logging
from time import sleep

from flux import settings
from flux.models import BaseOperator, TaskInstance, State
from flux.hooks import MySqlHook
from airflow import settings
from airflow.models import BaseOperator, TaskInstance, State
from airflow.hooks import MySqlHook


class BaseSensorOperator(BaseOperator):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from flux.models import BaseOperator
from airflow.models import BaseOperator
from subprocess import Popen, PIPE


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flux.models import BaseOperator
from airflow.models import BaseOperator


class DummyOperator(BaseOperator):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from flux.models import BaseOperator
from flux.hooks import MySqlHook
from airflow.models import BaseOperator
from airflow.hooks import MySqlHook


class MySqlOperator(BaseOperator):
Expand Down
16 changes: 8 additions & 8 deletions flux/settings.py → airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

if 'FLUX_HOME' not in os.environ:
os.environ['FLUX_HOME'] = os.path.join(os.path.dirname(__file__), "..")
FLUX_HOME = os.environ['FLUX_HOME']
if 'AIRFLOW_HOME' not in os.environ:
os.environ['AIRFLOW_HOME'] = os.path.join(os.path.dirname(__file__), "..")
AIRFLOW_HOME = os.environ['AIRFLOW_HOME']

BASE_FOLDER = FLUX_HOME + '/flux'
BASE_FOLDER = AIRFLOW_HOME + '/airflow'
if BASE_FOLDER not in sys.path:
sys.path.append(BASE_FOLDER)
DAGS_FOLDER = FLUX_HOME + '/dags'
BASE_LOG_FOLDER = FLUX_HOME + "/logs"
DAGS_FOLDER = AIRFLOW_HOME + '/dags'
BASE_LOG_FOLDER = AIRFLOW_HOME + "/logs"
HIVE_HOME_PY = '/usr/lib/hive/lib/py'
RUN_AS_MASTER = True
JOB_HEARTBEAT_SEC = 5
ID_LEN = 250 # Used for dag_id and task_id VARCHAR length
LOG_FORMAT = \
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s'
Session = sessionmaker()
engine = create_engine('mysql://flux:flux@localhost/flux')
# engine = create_engine('sqlite:///' + BASE_FOLDER + '/flux.db' )
engine = create_engine('mysql://airflow:airflow@localhost/airflow')
# engine = create_engine('sqlite:///' + BASE_FOLDER + '/airflow.db' )
Session.configure(bind=engine)
HEADER = """\
_____ __
Expand Down
2 changes: 1 addition & 1 deletion flux/utils.py → airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def validate_key(k, max_length=250):
elif not re.match(r'^[A-Za-z0-9_]+$', k):
raise Exception(
"The key has to be made of alphanumeric characters and "
"undersfluxs exclusively")
"undersairflows exclusively")
else:
return True

Expand Down
File renamed without changes.
26 changes: 13 additions & 13 deletions flux/www/app.py → airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
import markdown
import chartkick

from flux.settings import Session
from flux import models
from flux.models import State
from flux import settings
from flux import utils
from airflow.settings import Session
from airflow import models
from airflow.models import State
from airflow import settings
from airflow import utils

dagbag = models.DagBag(settings.DAGS_FOLDER)
session = Session()

app = Flask(__name__)
app.secret_key = 'fluxified'
app.secret_key = 'airflowified'

# Init for chartkick, the python wrapper for highcharts
ck = Blueprint(
Expand All @@ -54,13 +54,13 @@ class HomeView(AdminIndexView):
@expose("/")
def index(self):
md = "".join(
open(settings.FLUX_HOME + '/README.md', 'r').readlines())
open(settings.AIRFLOW_HOME + '/README.md', 'r').readlines())
content = Markup(markdown.markdown(md))
return self.render('admin/index.html', content=content)
admin = Admin(app, name="Flux", index_view=HomeView(name='Home'))
admin = Admin(app, name="Airflow", index_view=HomeView(name='Home'))


class Flux(BaseView):
class Airflow(BaseView):

def is_visible(self):
return False
Expand Down Expand Up @@ -145,7 +145,7 @@ def tree(self):
downstream=downstream)

flash("{0} task instances have been cleared".format(count))
return redirect(url_for('flux.tree', dag_id=dag_id))
return redirect(url_for('airflow.tree', dag_id=dag_id))

dates = utils.date_range(
from_date, base_date, dag.schedule_interval)
Expand Down Expand Up @@ -369,7 +369,7 @@ def gantt(self):
)


admin.add_view(Flux(name='DAGs'))
admin.add_view(Airflow(name='DAGs'))

# ------------------------------------------------
# Leveraging the admin for CRUD and browse on models
Expand All @@ -388,14 +388,14 @@ class ModelViewOnly(ModelView):


def filepath_formatter(view, context, model, name):
url = url_for('flux.code', dag_id=model.dag_id)
url = url_for('airflow.code', dag_id=model.dag_id)
short_fp = model.filepath.replace(settings.BASE_FOLDER + '/dags/', '')
link = Markup('<a href="{url}">{short_fp}</a>'.format(**locals()))
return link


def dag_formatter(view, context, model, name):
url = url_for('flux.tree', dag_id=model.dag_id, num_runs=45)
url = url_for('airflow.tree', dag_id=model.dag_id, num_runs=45)
link = Markup('<a href="{url}">{model.dag_id}</a>'.format(**locals()))
return link

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions airflow/www/static/docs
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes
File renamed without changes
File renamed without changes
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% extends "admin/dag.html" %}
{% block title %}Flux - DAGs{% endblock %}
{% block title %}Airflow - DAGs{% endblock %}

{% block tail %}
{{ super() }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% extends "admin/dag.html" %}
{% block title %}Flux - DAGs{% endblock %}
{% block title %}Airflow - DAGs{% endblock %}

{% block body %}
{{ super() }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{% extends "admin/master.html" %}

{% block title %}Flux - DAGs{% endblock %}
{% block title %}Airflow - DAGs{% endblock %}

{% block head_css %}
{{ super() }}
Expand All @@ -11,29 +11,29 @@
{% block body %}
<h2>DAG: {{ dag.dag_id }}</h2>
<ul class="nav nav-pills">
<li><a href="{{ url_for("flux.tree", dag_id=dag.dag_id, num_runs=45) }}">
<li><a href="{{ url_for("airflow.tree", dag_id=dag.dag_id, num_runs=45) }}">
<i class="icon-plus-sign"></i>Tree View
</a></li>
<li><a href="{{ url_for("flux.graph", dag_id=dag.dag_id) }}">
<li><a href="{{ url_for("airflow.graph", dag_id=dag.dag_id) }}">
<i class="icon-random"></i>
Graph View</a></li>
<li><a href="{{ url_for("flux.duration", dag_id=dag.dag_id) }}">
<li><a href="{{ url_for("airflow.duration", dag_id=dag.dag_id) }}">
<i class="icon-signal"></i>Task Duration
</a></li>
<li>
<a href="{{ url_for("flux.landing_times", dag_id=dag.dag_id) }}">
<a href="{{ url_for("airflow.landing_times", dag_id=dag.dag_id) }}">
<i class="icon-signal"></i>
Landing Times
</a>
</li>
<li>
<a href="{{ url_for("flux.gantt", dag_id=dag.dag_id) }}">
<a href="{{ url_for("airflow.gantt", dag_id=dag.dag_id) }}">
<i class="icon-align-left"></i>
Gantt
</a>
</li>
<li>
<a href="{{ url_for("flux.code", dag_id=dag.dag_id) }}">
<a href="{{ url_for("airflow.code", dag_id=dag.dag_id) }}">
<i class="icon-barcode"></i>
Code
</a>
Expand Down Expand Up @@ -115,15 +115,15 @@ <h4 class="modal-title" id="myModalLabel">
}

$("#btn_log").click(function(){
url = "{{ url_for('flux.log') }}" +
url = "{{ url_for('airflow.log') }}" +
"?task_id=" + task_id +
"&dag_id=" + dag_id +
"&execution_date=" + execution_date;
window.location = url;
});

$("#btn_clear").click(function(){
url = "{{ url_for('flux.tree') }}" +
url = "{{ url_for('airflow.tree') }}" +
"?action=clear" +
"&task_id=" + task_id +
"&dag_id=" + dag_id +
Expand All @@ -136,14 +136,14 @@ <h4 class="modal-title" id="myModalLabel">
});

$("#btn_gantt").click(function(){
url = "{{ url_for('flux.gantt') }}" +
url = "{{ url_for('airflow.gantt') }}" +
"?dag_id=" + dag_id +
"&execution_date=" + execution_date;
window.location = url;
});

$("#btn_graph").click(function(){
url = "{{ url_for('flux.graph') }}" +
url = "{{ url_for('airflow.graph') }}" +
"?dag_id=" + dag_id +
"&execution_date=" + execution_date;
window.location = url;
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{% import 'admin/lib.html' as lib with context %}
{% import 'admin/static.html' as admin_static with context %}

{% block title %}Flux - DAGs{% endblock %}
{% block title %}Airflow - DAGs{% endblock %}

{% block head_css %}
{{ super() }}
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% extends "admin/dag.html" %}
{% block title %}Flux - DAGs{% endblock %}
{% block title %}Airflow - DAGs{% endblock %}

{% block head_css %}
{{ super() }}
Expand Down
Loading

0 comments on commit 88c9962

Please sign in to comment.