Skip to content

Commit

Permalink
Changing configuration scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Jan 16, 2015
1 parent 5455fbc commit f991783
Show file tree
Hide file tree
Showing 26 changed files with 184 additions and 613 deletions.
8 changes: 4 additions & 4 deletions airflow/bin/airflow
Original file line number Diff line number Diff line change
Expand Up @@ -253,25 +253,25 @@ def initdb(args):
session.query(models.Connection).delete()
session.add(
models.Connection(
conn_id='local_mysql', db_type='mysql',
conn_id='local_mysql', conn_type='mysql',
host='localhost', login='airflow', password='airflow',
schema='airflow'))
session.commit()
session.add(
models.Connection(
conn_id='mysql_default', db_type='mysql',
conn_id='mysql_default', conn_type='mysql',
host='localhost', login='airflow', password='airflow',
schema='airflow'))
session.commit()
session.add(
models.Connection(
conn_id='presto_default', db_type='presto',
conn_id='presto_default', conn_type='presto',
host='localhost',
schema='hive', port=10001))
session.commit()
session.add(
models.Connection(
conn_id='hive_default', db_type='hive',
conn_id='hive_default', conn_type='hive',
host='localhost',
schema='default', port=10000))
session.commit()
Expand Down
133 changes: 78 additions & 55 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,81 @@
from ConfigParser import ConfigParser
import errno
import logging
import os

from ConfigParser import ConfigParser, NoOptionError, NoSectionError

class AirflowConfigParser(ConfigParser):
NO_DEFAULT = object()
_instance = None
_config_paths = ['airflow.cfg']
if 'AIRFLOW_CONFIG_PATH' in os.environ:
_config_paths.append(os.environ['AIRFLOW_CONFIG_PATH'])
logging.info("Config paths is " + str(_config_paths))

@classmethod
def add_config_paths(cls, path):
cls._config_paths.append(path)
cls.reload()

@classmethod
def instance(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = cls(*args, **kwargs)
cls._instance.reload()

return cls._instance

@classmethod
def reload(cls):
loaded_obj = cls.instance().read(cls._config_paths)
logging.info("the config object after loading is " + str(loaded_obj))
return loaded_obj

def get_with_default(self, method, section, option, default):
try:
return method(self, section, option)
except (NoOptionError, NoSectionError):
if default is AirflowConfigParser.NO_DEFAULT:
raise
return default

def get(self, section, option, default=NO_DEFAULT):
return self.get_with_default(ConfigParser.get, section, option, default)

def getint(self, section, option, default=NO_DEFAULT):
return self.get_with_default(ConfigParser.getint, section, option, default)

def getboolean(self, section, option, default=NO_DEFAULT):
return self.get_with_default(ConfigParser.getboolean, section, option, default)

def set(self, section, option, value):
if not ConfigParser.has_section(self, section):
ConfigParser.add_section(self, section)
return ConfigParser.set(self, section, option, value)

def getconf():
return AirflowConfigParser.instance()

conf = getconf()
DEFAULT_CONFIG = """\
[core]
airflow_home = {AIRFLOW_HOME}
authenticate = False
dags_folder = {AIRFLOW_HOME}/dags
base_folder = {AIRFLOW_HOME}/airflow
base_url = http://localhost:8080
executor = LocalExecutor
sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db
[server]
web_server_host = 0.0.0.0
web_server_port = 8080
[smtp]
smtp_host = localhost
smtp_user = airflow
smtp_port = 25
smtp_password = airflow
smtp_mail_from = [email protected]
[celery]
celery_app_name = airflow.executors.celery_executor
celeryd_concurrency = 16
worker_log_server_port = 8793
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
[hooks]
presto_default_conn_id = presto_default
hive_default_conn_id = hive_default
[misc]
job_heartbeat_sec = 5
id_len = 250
"""

def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise

'''
Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
"~/airflow" and "~/airflow/airflow.cfg" repectively as defaults.
'''

if 'AIRFLOW_HOME' not in os.environ:
AIRFLOW_HOME = os.path.expanduser('~/airflow')
else:
AIRFLOW_HOME = os.environ['AIRFLOW_HOME']

mkdir_p(AIRFLOW_HOME)

if 'AIRFLOW_CONFIG' not in os.environ:
AIRFLOW_CONFIG = AIRFLOW_HOME + '/airflow.cfg'
else:
AIRFLOW_CONFIG = os.environ['AIRFLOW_CONFIG']

conf = ConfigParser()
if not os.path.isfile(AIRFLOW_CONFIG):
'''
These configuration are used to generate a default configuration when
it is missing. The right way to change your configuration is to alter your
configuration file, not this code.
'''
logging.info("Createing new config file in: " + AIRFLOW_CONFIG)
f = open(AIRFLOW_CONFIG, 'w')
f.write(DEFAULT_CONFIG.format(**locals()))
f.close()

logging.info("Reading the config from " + AIRFLOW_CONFIG)
conf.read(AIRFLOW_CONFIG)
File renamed without changes.
34 changes: 20 additions & 14 deletions dags/examples/example1.py → airflow/example_dags/example1.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,57 @@
from airflow.operators import BashOperator, MySqlOperator, DummyOperator
from airflow.models import DAG
from airflow.executors import SequentialExecutor
from airflow.executors import LocalExecutor
from datetime import datetime

default_args = {
'owner': 'max',
'start_date': datetime(2014, 11, 1),
'mysql_dbid': 'local_mysql',
'mysql_conn_id': 'local_mysql',
}

dag = DAG(dag_id='example_1', executor=LocalExecutor())
dag = DAG(dag_id='example_1')
# dag = DAG(dag_id='example_1', executor=SequentialExecutor())

cmd = 'ls -l'
run_this_last = DummyOperator(
task_id='run_this_last',
**default_args)
default_args=default_args)
dag.add_task(run_this_last)

run_this = BashOperator(
task_id='run_after_loop', bash_command='echo 1', **default_args)
task_id='run_after_loop', bash_command='echo 1',
default_args=default_args)
dag.add_task(run_this)
run_this.set_downstream(run_this_last)

for i in range(9):
i = str(i)
task = BashOperator(
task_id='runme_'+i,
bash_command='sleep 20',
**default_args)
task_id='runme_'+i,
bash_command='sleep 20',
default_args=default_args)
task.set_downstream(run_this)
dag.add_task(task)

task = BashOperator(
task_id='also_run_this', bash_command='ls -l', **default_args)
task_id='also_run_this',
bash_command='ls -l',
default_args=default_args)
dag.add_task(task)
task.set_downstream(run_this_last)

sql = "CREATE TABLE IF NOT EXISTS deleteme (col INT);"
create_table = MySqlOperator(
task_id='create_table_mysql', sql=sql, **default_args)
task_id='create_table_mysql',
sql=sql,
default_args=default_args)
dag.add_task(create_table)

'''
sql = "INSERT INTO deleteme SELECT 1;"
task = MySqlOperator(task_id='also_run_mysql', sql=sql, **default_args)
task = MySqlOperator(task_id='also_run_mysql',
sql=sql,
default_args=default_args)
dag.add_task(task)
task.set_downstream(run_this_last)
task.set_upstream(create_table)

'''
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion airflow/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from airflow.settings import conf
from airflow.configuration import conf
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.sequential_executor import SequentialExecutor
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class CeleryConfig(object):
CELERY_ACKS_LATE = True
BROKER_URL = conf.get('celery', 'BROKER_URL')
CELERY_RESULT_BACKEND = conf.get('celery', 'CELERY_RESULT_BACKEND')
CELERYD_CONCURRENCY = int(conf.get('celery', 'CELERYD_CONCURRENCY'))
CELERYD_CONCURRENCY = conf.getint('celery', 'CELERYD_CONCURRENCY')

app = Celery(
conf.get('celery', 'CELERY_APP_NAME'),
Expand Down
1 change: 0 additions & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import subprocess
import time

from airflow.configuration import getconf
from airflow.utils import State

from airflow.executors.base_executor import BaseExecutor
Expand Down
4 changes: 0 additions & 4 deletions airflow/hooks/hive_hook.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import logging
import json
import subprocess
import sys
from tempfile import NamedTemporaryFile

from airflow.models import Connection
from airflow.configuration import conf
from airflow import settings

# Adding the Hive python libs to python path
sys.path.insert(0, conf.get('hooks', 'HIVE_HOME_PY'))

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
Expand Down
27 changes: 18 additions & 9 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
from airflow import utils
from airflow.utils import State
from airflow.utils import apply_defaults
from flask_login import current_user

Base = declarative_base()
ID_LEN = conf.getint('misc', 'ID_LEN')
SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
if 'mysql' in SQL_ALCHEMY_CONN:
Text = LONGTEXT

class DagBag(object):
"""
Expand All @@ -41,15 +43,22 @@ class DagBag(object):
def __init__(
self,
dag_folder=None,
executor=DEFAULT_EXECUTOR):
executor=DEFAULT_EXECUTOR,
include_examples=True):
if not dag_folder:
dag_folder = conf.get('core', 'DAGS_FOLDER')
logging.info("Filling up the DagBag from " + dag_folder)
self.dag_folder = dag_folder
self.dags = {}
self.file_last_changed = {}
self.executor = executor
self.collect_dags()
self.collect_dags(dag_folder)
if include_examples:
example_dag_folder = os.path.join(
os.path.dirname(__file__),
'example_dags')
self.collect_dags(example_dag_folder)


def process_file(self, filepath, only_if_updated=True):
"""
Expand Down Expand Up @@ -87,15 +96,15 @@ def process_file(self, filepath, only_if_updated=True):

self.file_last_changed[filepath] = dttm

def collect_dags(self, only_if_updated=True):
def collect_dags(self, dag_folder, only_if_updated=True):
"""
Given a file path or a folder, this file looks for python modules,
imports them and adds them to the dagbag collection.
"""
if os.path.isfile(self.dag_folder):
self.process_file(self.dag_folder)
elif os.path.isdir(self.dag_folder):
for root, dirs, files in os.walk(self.dag_folder):
if os.path.isfile(dag_folder):
self.process_file(dag_folder)
elif os.path.isdir(dag_folder):
for root, dirs, files in os.walk(dag_folder):
for f in files:
filepath = root + '/' + f
self.process_file(filepath)
Expand Down Expand Up @@ -186,7 +195,7 @@ class DagPickle(Base):
the database.
"""
id = Column(Integer, primary_key=True)
pickle = Column(LONGTEXT())
pickle = Column(Text())

__tablename__ = "dag_pickle"

Expand Down
12 changes: 10 additions & 2 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine
from sqlalchemy import exc

from airflow.configuration import conf

Expand All @@ -18,11 +17,20 @@
BASE_FOLDER = conf.get('core', 'BASE_FOLDER')
BASE_LOG_URL = "/admin/airflow/log"
SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
print "-"* 100
print(SQL_ALCHEMY_CONN)
print "-"* 100
if BASE_FOLDER not in sys.path:
sys.path.append(BASE_FOLDER)

engine_args = {}
if 'sqlite' not in SQL_ALCHEMY_CONN:
# Engine args not supported by sqllite
engine_args['pool_size'] = 50
engine_args['pool_recycle'] = 3600

engine = create_engine(
SQL_ALCHEMY_CONN, pool_size=50, pool_recycle=3600)
SQL_ALCHEMY_CONN, **engine_args)
Session = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine))
Expand Down
4 changes: 4 additions & 0 deletions airflow/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import errno
from functools import wraps
import inspect
import logging
import os
import re
import smtplib

Expand Down Expand Up @@ -187,3 +189,5 @@ def send_email(to, subject, html_content):
logging.info("Sent an altert email to " + str(to))
s.sendmail(SMTP_MAIL_FROM, to, msg.as_string())
s.quit()


Loading

0 comments on commit f991783

Please sign in to comment.