Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔧 refactor ex0 #70

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
24 changes: 13 additions & 11 deletions jobs/examples/ex0_extraction_job.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
""" Demo basic extraction job using a public datasource (from wikimedia) """
from yaetos.etl_utils import ETL_Base, Commandliner
from yaetos.etl_utils import ETLBase, Commandliner
import requests
import os
import pandas as pd
from pyspark import sql


class Job(ETL_Base):
def transform(self):
url = self.jargs.api_inputs['path']
class Job(ETLBase):
def transform(self) -> sql.DataFrame:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typing on return

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 Nice

url = self.jargs.api_inputs["path"]
resp = requests.get(url, allow_redirects=True)
self.logger.info('Finished reading file from {}.'.format(url))
self.logger.info("Finished reading file from {}.".format(url))

# Save to local
tmp_dir = 'tmp'
os.makedirs(tmp_dir, exist_ok = True)
local_path = tmp_dir+'/tmp_file.csv.gz'
open(local_path, 'wb').write(resp.content) # creating local copy, necessary for sc_sql.read.csv, TODO: check to remove local copy step.
self.logger.info('Copied file locally at {}.'.format(local_path))
os.makedirs(tmp_dir, exist_ok=True)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would break since tmp_dir = 'tmp' line is deleted above.

local_path = "tmp/tmp_file.csv.gz"
with open(local_path, "wb") as lp:
lp.write(resp.content)
# creating local copy, necessary for sc_sql.read.csv, TODO: check to remove local copy step.
self.logger.info(f"Copied file locally at {local_path}.")

# Save as dataframe
pdf = pd.read_csv(local_path)
Expand All @@ -25,5 +27,5 @@ def transform(self):


if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
16 changes: 10 additions & 6 deletions jobs/examples/ex10_troubleshoot_job.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
"""To show troubleshooting, done through 'import ipdb; ipdb.set_trace()'."""
from yaetos.etl_utils import ETL_Base, Commandliner
from yaetos.etl_utils import ETLBase, Commandliner


class Job(ETL_Base):
class Job(ETLBase):
def transform(self, some_events, other_events):
df = self.query("""
df = self.query(
"""
SELECT se.session_id, count(*) as count_events
FROM some_events se
JOIN other_events oe on se.session_id=oe.session_id
WHERE se.action='searchResultPage' and se.n_results>0
group by se.session_id
order by count(*) desc
""")
"""
)

import ipdb; ipdb.set_trace() # will drop to python terminal here to inspect
import ipdb

ipdb.set_trace() # will drop to python terminal here to inspect

return df


if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
22 changes: 13 additions & 9 deletions jobs/examples/ex1_frameworked_job.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
"""Same as ex1_full_sql_job.sql but allows access to spark for more complex ops (not used here but in ex2_frameworked_job.py)."""
from yaetos.etl_utils import ETL_Base, Commandliner
from yaetos.etl_utils import ETLBase, Commandliner
from pyspark import sql


class Job(ETL_Base):
def transform(self, some_events, other_events):
df = self.query("""
class Job(ETLBase):
def transform(
self, some_events="some_events", other_events="other_events"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inputs have to be spark dataframes or pandas dataframes. When they are spark dataframes, the framework register all of them into the spark SQL environment so they can be used directly inside queries, like those in self.query().

) -> sql.DataFrame:
return self.query(
f"""
SELECT se.session_id, count(*) as count_events
FROM some_events se
JOIN other_events oe on se.session_id=oe.session_id
FROM {some_events} se
JOIN {other_events} oe on se.session_id=oe.session_id
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea. I have been thinking of doing something like that but haven't gotten to it yet. I think it would need to be done differently since the input variables are not expected to be strings.

WHERE se.action='searchResultPage' and se.n_results>0
group by se.session_id
order by count(*) desc
""")
return df
"""
)


if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
6 changes: 3 additions & 3 deletions jobs/examples/ex1_raw_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@


# Start SparkContext
sc = SparkContext(appName='ex1_raw_job')
sc = SparkContext(appName="ex1_raw_job")
sc_sql = SQLContext(sc)

# Load data from S3 bucket
some_events = sc_sql.read.csv(input_some_events, header=True)
some_events.createOrReplaceTempView('some_events')
some_events.createOrReplaceTempView("some_events")
other_events = sc_sql.read.csv(input_other_events, header=True)
other_events.createOrReplaceTempView('other_events')
other_events.createOrReplaceTempView("other_events")

# Calculate word counts
query_str = """
Expand Down
43 changes: 17 additions & 26 deletions jobs/examples/ex2_frameworked_job.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@
from yaetos.etl_utils import ETL_Base, Commandliner
from yaetos.etl_utils import ETLBase, Commandliner
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import col
from pyspark import sql
from datetime import datetime


class Job(ETL_Base):
def transform(self, some_events, other_events):
class Job(ETLBase):
def transform(self, some_events, other_events) -> sql.DataFrame:
"""For demo only. Functional but no specific business logic."""

df = self.query("""
df = self.query(
"""
SELECT se.timestamp, se.session_id, se.group, se.action
FROM some_events se
JOIN other_events oe on se.session_id=oe.session_id
WHERE se.action='searchResultPage' and se.n_results>0
""")
"""
)

udf_format_datetime = udf(self.format_datetime, StringType())

events_cleaned = df \
.withColumn('timestamp_obj', udf_format_datetime(df.timestamp).cast("timestamp")) \
.where(col('timestamp').like("%2.016%") == False)
events_cleaned = df.withColumn(
"timestamp_obj", udf_format_datetime(df.timestamp).cast("timestamp")
).where(col("timestamp").like("%2.016%") == False)

events_cleaned.createOrReplaceTempView("events_cleaned")

self.sc_sql.registerFunction("date_diff_sec", self.date_diff_sec, IntegerType())
output = self.query("""
output = self.query(
"""
WITH
session_times as (
SELECT timestamp, timestamp_obj, session_id, group, action,
Expand All @@ -44,25 +49,11 @@ def transform(self, some_events, other_events):
select *
from session_grouped
order by delta_sec desc, first_timestamp
""")
"""
)
return output

@staticmethod
def format_datetime(wiki_dt):
dt = {}
dt['year'] = wiki_dt[:4]
dt['month'] = wiki_dt[4:6]
dt['day'] = wiki_dt[6:8]
dt['hour'] = wiki_dt[8:10]
dt['minute'] = wiki_dt[10:12]
dt['sec'] = wiki_dt[12:14]
return '{year}-{month}-{day} {hour}:{minute}:{sec}'.format(**dt)

@staticmethod
def date_diff_sec(x,y):
return int((y-x).total_seconds())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 functions above are needed for the transform to work.



if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
17 changes: 10 additions & 7 deletions jobs/examples/ex3_incremental_job.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from yaetos.etl_utils import ETL_Base, Commandliner
from yaetos.etl_utils import ETLBase, Commandliner
from pyspark import sql


class Job(ETL_Base):
def transform(self, processed_events):
df = self.query("""
class Job(ETLBase):
def transform(self, processed_events="processed_events") -> sql.DataFrame:
df = self.query(
f"""
SELECT timestamp_obj as other_timestamp, *
FROM processed_events se
FROM {processed_events} se
order by timestamp_obj
""")
"""
)
return df


if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
24 changes: 7 additions & 17 deletions jobs/examples/ex3_incremental_prep_job.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,21 @@
from yaetos.etl_utils import ETL_Base, Commandliner
from yaetos.etl_utils import ETLBase, Commandliner
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import col


class Job(ETL_Base):
class Job(ETLBase):
def transform(self, some_events):

udf_format_datetime = udf(self.format_datetime, StringType())

events_cleaned = some_events \
.withColumn('timestamp_obj', udf_format_datetime(some_events.timestamp).cast("timestamp")) \
.where(col('timestamp').like("%2.016%") == False)
events_cleaned = some_events.withColumn(
"timestamp_obj",
udf_format_datetime(some_events.timestamp).cast("timestamp"),
).where(col("timestamp").like("%2.016%") == False)
return events_cleaned

@staticmethod
def format_datetime(wiki_dt):
dt = {}
dt['year'] = wiki_dt[:4]
dt['month'] = wiki_dt[4:6]
dt['day'] = wiki_dt[6:8]
dt['hour'] = wiki_dt[8:10]
dt['minute'] = wiki_dt[10:12]
dt['sec'] = wiki_dt[12:14]
return '{year}-{month}-{day} {hour}:{minute}:{sec}'.format(**dt)


if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
21 changes: 10 additions & 11 deletions jobs/examples/ex4_dependency1_job.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
from yaetos.etl_utils import ETL_Base, Commandliner
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import col
from yaetos.etl_utils import ETLBase, Commandliner
from pyspark import sql


class Job(ETL_Base):
def transform(self, some_events):
df = self.query("""
class Job(ETLBase):
def transform(self, some_events: str = "some_events") -> sql.DataFrame:
return self.query(
f"""
SELECT se.session_id, length(se.session_id) as session_length
FROM some_events se
""")
return df
FROM {some_events} se
"""
)


if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
12 changes: 7 additions & 5 deletions jobs/examples/ex4_dependency2_job.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from yaetos.etl_utils import ETL_Base, Commandliner
from yaetos.etl_utils import ETLBase, Commandliner
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import col


class Job(ETL_Base):
class Job(ETLBase):
def transform(self, some_events):
df = self.query("""
df = self.query(
"""
SELECT se.session_id, session_length, session_length*2 as doubled_length
FROM some_events se
""")
"""
)
return df


if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
12 changes: 7 additions & 5 deletions jobs/examples/ex4_dependency4_job.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from yaetos.etl_utils import ETL_Base, Commandliner
from yaetos.etl_utils import ETLBase, Commandliner
from pyspark.sql.functions import udf, array
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import col


class Job(ETL_Base):
class Job(ETLBase):
def transform(self, some_events):
df = self.query("""
df = self.query(
"""
SELECT * , session_length*8 as D
FROM some_events se
""")
"""
)
return df


if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
18 changes: 10 additions & 8 deletions jobs/examples/ex5_copy_to_oracle_job.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
from yaetos.etl_utils import ETL_Base, Commandliner
from yaetos.etl_utils import ETLBase, Commandliner
from sqlalchemy import types


class Job(ETL_Base):
class Job(ETLBase):
OUTPUT_TYPES = {
'session_id': types.VARCHAR(16),
'count_events': types.INT(),
}
"session_id": types.VARCHAR(16),
"count_events": types.INT(),
}

def transform(self, some_events, other_events):
df = self.query("""
df = self.query(
"""
SELECT se.session_id, count(*) as count_events
FROM some_events se
JOIN other_events oe on se.session_id=oe.session_id
WHERE se.action='searchResultPage' and se.n_results>0
group by se.session_id
order by count(*) desc
""")
"""
)
return df


if __name__ == "__main__":
args = {'job_param_file': 'conf/jobs_metadata.yml'}
args = {"job_param_file": "conf/jobs_metadata.yml"}
Commandliner(Job, **args)
Loading