Skip to content

Commit

Permalink
integrating config parser to read config values from a file
Browse files Browse the repository at this point in the history
  • Loading branch information
Krishna Puttaswamy committed Nov 7, 2014
1 parent cc94aa0 commit 0a673b4
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 83 deletions.
31 changes: 31 additions & 0 deletions airflow/airflow.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[core]
AIRFLOW_HOME: /Users/krishna_puttaswamy/workspace/Airflow
BASE_LOG_FOLDER: %(AIRFLOW_HOME)s/logs
DAGS_FOLDER: %(AIRFLOW_HOME)s/dags
BASE_FOLDER: %(AIRFLOW_HOME)s/airflow

[server]
WEB_SERVER_HOST: 0.0.0.0
WEB_SERVER_PORT: 8080

[smpt]
SMTP_HOST: 'localhost'
SMTP_PORT: 25
SMTP_PASSWORD: None
SMTP_MAIL_FROM: '[email protected]'

[celery]
CELERY_APP_NAME: airflow.executors.celery_worker
CELERY_BROKER: amqp
CELERY_RESULTS_BACKEND: amqp://

[hooks]
HIVE_HOME_PY: '/usr/lib/hive/lib/py'
PRESTO_DEFAULT_DBID: presto_default
HIVE_DEFAULT_DBID: hive_default

[misc]
RUN_AS_MASTER: True
JOB_HEARTBEAT_SEC: 5
# Used for dag_id and task_id VARCHAR length
ID_LEN: 250
29 changes: 17 additions & 12 deletions airflow/bin/airflow
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python

from airflow import configuration
from airflow import settings
from airflow.models import DagBag, TaskInstance, DagPickle, State

Expand All @@ -17,7 +18,7 @@ from sqlalchemy import func
# Common help text across subcommands
mark_success_help = "Mark jobs as succeeded without running them"
subdir_help = "File location or directory from which to look for the dag"

config = configuration.get_config()

def backfill(args):
logging.basicConfig(level=logging.INFO)
Expand All @@ -44,15 +45,16 @@ def backfill(args):
def run(args):

# Setting up logging
directory = settings.BASE_LOG_FOLDER + \
directory = config.get('core', 'BASE_LOG_FOLDER') + \
"/{args.dag_id}/{args.task_id}".format(args=args)
if not os.path.exists(directory):
os.makedirs(directory)
args.execution_date = dateutil.parser.parse(args.execution_date)
iso = args.execution_date.isoformat()
filename = "{directory}/{iso}".format(**locals())
logging.basicConfig(
filename=filename, level=logging.INFO, format=settings.LOG_FORMAT)
filename=filename, level=logging.INFO,
format=settings.LOG_FORMAT)
print("Logging into: " + filename)

if not args.pickle:
Expand Down Expand Up @@ -106,10 +108,11 @@ def clear(args):


def webserver(args):
logging.basicConfig(level=logging.DEBUG, format=settings.LOG_FORMAT)
logging.basicConfig(level=logging.DEBUG,
format=settings.LOG_FORMAT)
print(settings.HEADER)
from www.app import app
print("Starting the web server on port {0}.".format(args.port))
print("Starting the web server on port {0} and host {1}.".format(args.port, args.hostname))
app.run(debug=True, port=args.port, host=args.hostname)


Expand Down Expand Up @@ -251,7 +254,7 @@ if __name__ == '__main__':
action="store_true")
parser_backfill.add_argument(
"-sd", "--subdir", help=subdir_help,
default=settings.DAGS_FOLDER)
default=config.get('core', 'DAGS_FOLDER'))
parser_backfill.set_defaults(func=backfill)

ht = "Clear a set of task instance, as if they never ran"
Expand All @@ -272,7 +275,7 @@ if __name__ == '__main__':
"-d", "--downstream", help=ht, action="store_true")
parser_clear.add_argument(
"-sd", "--subdir", help=subdir_help,
default=settings.DAGS_FOLDER)
default=config.get('core', 'DAGS_FOLDER'))
parser_clear.set_defaults(func=clear)

ht = "Run a single task instance"
Expand All @@ -282,7 +285,8 @@ if __name__ == '__main__':
parser_run.add_argument(
"execution_date", help="The execution date to run")
parser_run.add_argument(
"-sd", "--subdir", help=subdir_help, default=settings.DAGS_FOLDER)
"-sd", "--subdir", help=subdir_help,
default=config.get('core', 'DAGS_FOLDER'))
parser_run.add_argument(
"-m", "--mark_success", help=mark_success_help, action="store_true")
parser_run.add_argument(
Expand All @@ -298,18 +302,18 @@ if __name__ == '__main__':
help="Serialized pickle object of the entire dag (used internally)")
parser_run.set_defaults(func=run)

ht = "Start a Flux webserver instance"
ht = "Start a Airflow webserver instance"
parser_webserver = subparsers.add_parser('webserver', help=ht)
parser_webserver.add_argument(
"-p", "--port",
default=settings.WEB_SERVER_PORT,
default=config.get('server', 'WEB_SERVER_PORT'),
type=int,
help="Set the port on which to run the web server")
parser_webserver.set_defaults(func=webserver)

parser_webserver.add_argument(
"-hn", "--hostname",
default=settings.WEB_SERVER_HOST,
default=config.get('server', 'WEB_SERVER_HOST'),
help="Set the hostname on which to run the web server")
parser_webserver.set_defaults(func=webserver)

Expand All @@ -318,7 +322,8 @@ if __name__ == '__main__':
parser_master.add_argument(
"-d", "--dag_id", help="The id of the dag to run")
parser_master.add_argument(
"-sd", "--subdir", help=subdir_help, default=settings.DAGS_FOLDER)
"-sd", "--subdir", help=subdir_help,
default=config.get('core', 'DAGS_FOLDER'))
parser_master.set_defaults(func=master)

ht = "Initialize and reset the metadata database"
Expand Down
57 changes: 57 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import os

from ConfigParser import ConfigParser, NoOptionError, NoSectionError

class AirflowConfigParser(ConfigParser):
NO_DEFAULT = object()
_instance = None
_config_paths = ['airflow.cfg']
if 'AIRFLOW_CONFIG_PATH' in os.environ:
_config_paths.append(os.environ['AIRFLOW_CONFIG_PATH'])
logging.info("Config paths is " + str(_config_paths))
print("Config paths is " + str(_config_paths))

@classmethod
def add_config_paths(cls, path):
cls._config_paths.append(path)
cls.reload()

@classmethod
def instance(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = cls(*args, **kwargs)
cls._instance.reload()

return cls._instance

@classmethod
def reload(cls):
loaded_obj = cls.instance().read(cls._config_paths)
logging.info("the config object after loading is " + str(loaded_obj))
return loaded_obj

def get_with_default(self, method, section, option, default):
try:
return method(self, section, option)
except (NoOptionError, NoSectionError):
if default is AirflowConfigParser.NO_DEFAULT:
raise
return default

def get(self, section, option, default=NO_DEFAULT):
return self.get_with_default(ConfigParser.get, section, option, default)

def getint(self, section, option, default=NO_DEFAULT):
return self.get_with_default(ConfigParser.getint, section, option, default)

def getboolean(self, section, option, default=NO_DEFAULT):
return self.get_with_default(ConfigParser.getboolean, section, option, default)

def set(self, section, option, value):
if not ConfigParser.has_section(self, section):
ConfigParser.add_section(self, section)
return ConfigParser.set(self, section, option, value)

def get_config():
return AirflowConfigParser.instance()
4 changes: 2 additions & 2 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time

from airflow.executors.base_executor import BaseExecutor
from airflow import settings
from airflow import configuration
from airflow.utils import State
from celery_worker import execute_command

Expand Down Expand Up @@ -52,7 +52,7 @@ def run(self):
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
BASE_FOLDER = settings.BASE_FOLDER
BASE_FOLDER = configuration.get_config().get('core', 'BASE_FOLDER')
command = (
"exec bash -c '"
"cd $AIRFLOW_HOME;\n" +
Expand Down
13 changes: 7 additions & 6 deletions airflow/executors/celery_worker.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import subprocess
import logging
from celery import Celery

from airflow import settings
from airflow import configuration
from celery import Celery

# to start the celery worker, run the command:
# "celery -A airflow.executors.celery_worker worker --loglevel=info"

# app = Celery('airflow.executors.celery_worker', backend='amqp', broker='amqp://')
app = Celery(
settings.CELERY_APP_NAME,
backend=settings.CELERY_BROKER,
broker=settings.CELERY_RESULTS_BACKEND)
config = configuration.get_config()

app = Celery(
config.get('celery', 'CELERY_APP_NAME'),
backend=config.get('celery', 'CELERY_BROKER'),
broker=config.get('celery', 'CELERY_RESULTS_BACKEND'))

@app.task(name='airflow.executors.celery_worker.execute_command')
def execute_command(command):
Expand Down
4 changes: 2 additions & 2 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import subprocess
import time

from airflow import settings
from airflow import configuration
from airflow.utils import State

from airflow.executors.base_executor import BaseExecutor
Expand All @@ -24,7 +24,7 @@ def run(self):
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
BASE_FOLDER = settings.BASE_FOLDER
BASE_FOLDER = configuration.get_config().get('core', 'BASE_FOLDER')
print command
command = (
"exec bash -c '"
Expand Down
6 changes: 4 additions & 2 deletions airflow/hooks/hive_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import subprocess
import sys
from airflow.models import DatabaseConnection
from airflow import configuration
from airflow import settings

# Adding the Hive python libs to python path
sys.path.insert(0, settings.HIVE_HOME_PY)
sys.path.insert(0, configuration.get_config().get('hooks', 'HIVE_HOME_PY'))

from thrift.transport import TSocket
from thrift.transport import TTransport
Expand All @@ -16,7 +17,8 @@


class HiveHook(BaseHook):
def __init__(self, hive_dbid=settings.HIVE_DEFAULT_DBID):
def __init__(self,
hive_dbid=configuration.get_config().get('hooks', 'HIVE_DEFAULT_DBID')):
session = settings.Session()
db = session.query(
DatabaseConnection).filter(
Expand Down
3 changes: 2 additions & 1 deletion airflow/hooks/presto/presto_hook.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import subprocess

from airflow import settings
from airflow import configuration
from airflow.models import DatabaseConnection
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.presto.presto_client import PrestoClient
Expand All @@ -12,7 +13,7 @@ class PrestoHook(BaseHook):
"""
Interact with Presto!
"""
def __init__(self, presto_dbid=settings.PRESTO_DEFAULT_DBID):
def __init__(self, presto_dbid=configuration.get_config().get('hooks', 'PRESTO_DEFAULT_DBID')):
session = settings.Session()
db = session.query(
DatabaseConnection).filter(
Expand Down
5 changes: 3 additions & 2 deletions airflow/macros/hive.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from airflow import settings
from airflow import configuration


def max_partition(
table, schema="default", hive_dbid=settings.HIVE_DEFAULT_DBID):
table, schema="default",
hive_dbid=configuration.get_config().get('hooks', 'HIVE_DEFAULT_DBID')):
from airflow.hooks.hive_hook import HiveHook
hh = HiveHook(hive_dbid=hive_dbid)
return hh.max_partition(schema=schema, table=table)
Loading

0 comments on commit 0a673b4

Please sign in to comment.