Skip to content

Commit

Permalink
Moving folders around
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Oct 8, 2014
1 parent 86e4c01 commit 1f890d0
Show file tree
Hide file tree
Showing 51 changed files with 70 additions and 83 deletions.
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ A task instance represents a task run, for a specific point in time. While the t
Installation
------------
##### Debian packages
sudo apt-get install virtualenv python-dev
sudo apt-get install libmysqlclient-dev mysql-server
sudo apt-get g++
sudo apt-get install virtualenv python-dev
sudo apt-get install libmysqlclient-dev mysql-server
sudo apt-get g++
##### Required environment variable, add this to your .bashrc
export FLUX_HOME=~/Flux
##### Create a python virtualenv
virtualenv env # creates the environment
source init.sh # activates the environment
virtualenv env # creates the environment
source init.sh # activates the environment
##### Use pip to install the python packages required by Flux
pip install -r requirements.txt
pip install -r requirements.txt
##### Setup the metdata database
Here are steps to get started using MySQL as a backend for the metadata database, though any backend supported by SqlAlquemy should work just fine.

Expand Down
14 changes: 1 addition & 13 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ TODO
-----
#### UI
* Tree view: remove dummy root node
* Graph view add tooltip
* Backfill wizard
* Fix datepicker

#### Command line
* Add support for including upstream and downstream
* backfill: add support for including upstream and downstream
#### Write unittests
* For each existing operator
#### More Operators!
Expand All @@ -22,23 +21,12 @@ TODO
* Previous execution timestamp
* ...
#### Backend
* LocalExecutor, ctrl-c should result in error state instead of forever running
* CeleryExecutor
* Clear should kill running jobs
#### Misc
* Require and use $FLUX_HOME
* Rename core to flux
* BaseJob
* DagBackfillJob
* TaskIntanceJob
* ClearJob?
* Write an hypervisor, looks for dead jobs without a heartbeat and kills
* Authentication with Flask-Login and Flask-Principal
* email_on_retry

#### Testing required
* result queue with remove descendants

#### Wishlist
* Jobs can send pickles of local version to remote executors?
* Support for cron like synthax (0 * * * ) using croniter library
4 changes: 2 additions & 2 deletions dags/examples/example1.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from core.operators import BashOperator, MySqlOperator, DummyOperator
from core.models import DAG
from flux.operators import BashOperator, MySqlOperator, DummyOperator
from flux.models import DAG
from datetime import datetime

default_args = {
Expand Down
4 changes: 2 additions & 2 deletions dags/examples/example2.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from core.operators import BashOperator
from core.models import DAG
from flux.operators import BashOperator
from flux.models import DAG
from datetime import datetime

default_args = {
Expand Down
4 changes: 2 additions & 2 deletions dags/examples/example3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from core.operators import BashOperator, MySqlOperator
from core.models import DAG
from flux.operators import BashOperator, MySqlOperator
from flux.models import DAG
from datetime import datetime

default_args = {
Expand Down
4 changes: 2 additions & 2 deletions dags/examples/hourly.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from core import DAG
from flux import DAG
from datetime import datetime, timedelta
from core.operators import BashOperator
from flux.operators import BashOperator

default_args = {
'owner': 'max',
Expand Down
6 changes: 3 additions & 3 deletions dags/examples/simple.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from core.operators import MySqlOperator
from core.executors import SequentialExecutor
from core import DAG
from flux.operators import MySqlOperator
from flux.executors import SequentialExecutor
from flux import DAG
from datetime import datetime

# Setting some default operator parameters
Expand Down
4 changes: 2 additions & 2 deletions dags/examples/test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from core.operators import BashOperator
from core.models import DAG
from flux.operators import BashOperator
from flux.models import DAG
from datetime import datetime, timedelta

default_args = {
Expand Down
2 changes: 1 addition & 1 deletion dags/examples/test_signal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import core as flux
import flux as flux
from datetime import datetime

default_args = {
Expand Down
6 changes: 3 additions & 3 deletions db_reset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from core import models
from core import settings
from core.hooks import MySqlHook
from flux import models
from flux import settings
from flux.hooks import MySqlHook
import logging

if __name__ == '__main__':
Expand Down
5 changes: 2 additions & 3 deletions flux
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#!/bin/bash
BASEDIR=$(dirname $0)
export PYTHONPATH=$BASEDIR/Flux
python core/bin/flux.py $*
source $FLUX_HOME/init.sh
python src/flux/bin/flux_bin.py $*
4 changes: 2 additions & 2 deletions init.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash
source env/bin/activate
export PYTHONPATH=~/Flux
source $FLUX_HOME/env/bin/activate
export PYTHONPATH=$FLUX_HOME/src
File renamed without changes.
18 changes: 6 additions & 12 deletions core/bin/flux.py → src/flux/bin/flux_bin.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
#!python

from flux import settings
from flux.models import DagBag, TaskInstance, DagPickle, State

import dateutil.parser
from datetime import datetime
import logging
import os
import pickle
import signal
import sys
from time import sleep


BASE_FOLDER = os.path.abspath(__file__ + "/../../../")
if BASE_FOLDER not in sys.path:
sys.path.append(BASE_FOLDER)
import argparse
from core import settings
from core.models import DagBag
from core.models import DagPickle
from core.models import TaskInstance
from core.models import State
from datetime import datetime
from sqlalchemy import func

# Common help text across subcommands
Expand Down Expand Up @@ -149,7 +143,7 @@ def master(args):
if task.task_id not in ti_dict:
# Brand new task, let's get started
ti = TI(task, task.start_date)
executor.queue_command(ti.key, ti.command(mark_success))
executor.queue_command(ti.key, ti.command())
else:
ti = ti_dict[task.task_id]
ti.task = task # Hacky but worky
Expand All @@ -158,7 +152,7 @@ def master(args):
# the retry delay is met
if ti.is_runnable():
executor.queue_command(
ti.key, ti.command(mark_success))
ti.key, ti.command())
elif ti.state == State.RUNNING:
# Only one task at a time
continue
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
import time

from core import settings
from core.utils import State
from flux import settings
from flux.utils import State


class BaseExecutor(object):
Expand Down Expand Up @@ -73,7 +73,7 @@ def run(self):
BASE_FOLDER = settings.BASE_FOLDER
command = (
"exec bash -c '"
"cd {BASE_FOLDER};\n" +
"cd $FLUX_HOME;\n" +
"source init.sh;\n" +
command +
"'"
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions core/hooks/mysql_hook.py → src/flux/hooks/mysql_hook.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import MySQLdb
from core import settings
from core.models import DatabaseConnection
from flux import settings
from flux.models import DatabaseConnection


class MySqlHook(object):
Expand Down
File renamed without changes.
12 changes: 8 additions & 4 deletions core/models.py → src/flux/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from core import macros
from core.executors import DEFAULT_EXECUTOR
from . import settings
from . import utils
from flux import macros
from flux.executors import DEFAULT_EXECUTOR
from flux import settings
from flux import utils
from settings import ID_LEN
import socket
from utils import State
Expand Down Expand Up @@ -178,11 +178,15 @@ def command(self, mark_success=False, pickle=None):
iso = self.execution_date.isoformat()
mark_success = "--mark_success" if mark_success else ""
pickle = "--pickle {0}".format(pickle.id) if pickle else ""
subdir = ""
if not pickle and self.dag and self.dag.filepath:
subdir = "-sd {self.task.dag.filepath}"
return (
"./flux run "
"{self.dag_id} {self.task_id} {iso} "
"{mark_success} "
"{pickle} "
"{sub_dir} "
).format(**locals())

@property
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import datetime
from time import sleep

from core.models import BaseOperator
from core.hooks import MySqlHook
from flux.models import BaseOperator
from flux.hooks import MySqlHook


class BaseSensorOperator(BaseOperator):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from core.models import BaseOperator
from flux.models import BaseOperator
from subprocess import Popen, PIPE


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from core.models import BaseOperator
from flux.models import BaseOperator


class DummyOperator(BaseOperator):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from core.models import BaseOperator
from core.hooks import MySqlHook
from flux.models import BaseOperator
from flux.hooks import MySqlHook


class MySqlOperator(BaseOperator):
Expand Down
12 changes: 8 additions & 4 deletions core/settings.py → src/flux/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

BASE_FOLDER = os.path.abspath(os.path.join(
os.path.dirname(os.path.realpath(__file__)), os.pardir))
if 'FLUX_HOME' not in os.environ:
raise Exception(
"Looks like someone forgot to set their FLUX_HOME env variable.")
FLUX_HOME = os.environ['FLUX_HOME']

BASE_FOLDER = FLUX_HOME + '/src/flux'
if BASE_FOLDER not in sys.path:
sys.path.append(BASE_FOLDER)
DAGS_FOLDER = BASE_FOLDER + '/dags'
BASE_LOG_FOLDER = BASE_FOLDER + "/logs"
DAGS_FOLDER = FLUX_HOME + '/dags'
BASE_LOG_FOLDER = FLUX_HOME + "/logs"
RUN_AS_MASTER = True
JOB_HEARTBEAT_SEC = 5
ID_LEN = 250 # Used for dag_id and task_id VARCHAR length
Expand Down
2 changes: 1 addition & 1 deletion core/utils.py → src/flux/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def validate_key(k, max_length=250):
elif not re.match(r'^[A-Za-z0-9_]+$', k):
raise Exception(
"The key has to be made of alphanumeric characters and "
"underscores exclusively")
"undersfluxs exclusively")
else:
return True

Expand Down
File renamed without changes.
12 changes: 6 additions & 6 deletions www/app.py → src/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import markdown
import chartkick

from core.settings import Session
from core import models
from core.models import State
from core import settings
from core import utils
from flux.settings import Session
from flux import models
from flux.models import State
from flux import settings
from flux import utils

dagbag = models.DagBag(settings.DAGS_FOLDER)
session = Session()
Expand Down Expand Up @@ -54,7 +54,7 @@ class HomeView(AdminIndexView):
@expose("/")
def index(self):
md = "".join(
open(settings.BASE_FOLDER + '/README.md', 'r').readlines())
open(settings.FLUX_HOME + '/README.md', 'r').readlines())
content = Markup(markdown.markdown(md))
return self.render('admin/index.html', content=content)
admin = Admin(app, name="Flux", index_view=HomeView(name='Home'))
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes
File renamed without changes
File renamed without changes
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
4 changes: 0 additions & 4 deletions test.py

This file was deleted.

0 comments on commit 1f890d0

Please sign in to comment.