-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtasks_ml_prep.py
196 lines (147 loc) · 6.35 KB
/
tasks_ml_prep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""Tasks which prepare data before their use in a model sweep including machine learning.
Tasks which prepare data before their use in a model sweep including machine learning. Unlike
tasks_preprocess, these steps are not specific to individual modeling tasks.
License:
BSD, see LICENSE.md
"""
import json
import os
import sqlite3
import luigi
import const
import tasks_auxiliary
import tasks_preprocess
import tasks_sql
class CheckMlPrepTask(luigi.Task):
"""Check that view required for modeling data preparation have been satisfied."""
task_dir = luigi.Parameter(default=const.DEFAULT_TASK_DIR)
def requires(self):
"""Indicate that data preprocessing for plastics and auxiliary data are required."""
return {
'auxiliary': tasks_auxiliary.CheckViewsAuxTask(task_dir=self.task_dir),
'preprocess': tasks_preprocess.CheckViewsTask(task_dir=self.task_dir)
}
def output(self):
"""Report that the prerequisite views have been checked."""
out_path = os.path.join(self.task_dir, '200_check_prep.json')
return luigi.LocalTarget(out_path)
def run(self):
"""Run the check on the required views."""
with self.input()['auxiliary'].open('r') as f:
job_info = json.load(f)
database_loc = job_info['database']
connection = sqlite3.connect(database_loc)
cursor = connection.cursor()
cursor.execute('''
SELECT
gdp,
population
FROM
auxiliary
WHERE
year = 2020
AND region = 'nafta'
''')
aux_results = cursor.fetchall()[0]
assert aux_results[0] > 0
assert aux_results[1] > 0
cursor = connection.cursor()
cursor.execute('''
SELECT
consumptionPackagingMT,
eolRecyclingPercent
FROM
summary
WHERE
year = 2020
AND region = 'nafta'
''')
preprocess_results = cursor.fetchall()[0]
assert preprocess_results[0] > 0
assert preprocess_results[1] > 0
cursor.close()
connection.close()
with self.output().open('w') as f:
return json.dump(job_info, f)
class BuildMlViewsTask(tasks_sql.SqlExecuteTask):
"""Execute a series of scripts to build derivative views required for individual models.
Build views through a series of scripts that build views deriving from other views that address
the needs of specific modeling tasks ahead of sweeps.
"""
task_dir = luigi.Parameter(default=const.DEFAULT_TASK_DIR)
def requires(self):
"""Require that the prerequisite views were checked."""
return CheckMlPrepTask(task_dir=self.task_dir)
def output(self):
"""Report that the task-specific views have been constructed."""
out_path = os.path.join(self.task_dir, '201_build_views.json')
return luigi.LocalTarget(out_path)
def get_scripts(self):
"""Get the list of scripts that need to be executed."""
return [
'08_instance/instance_consumption_normal.sql',
'08_instance/instance_consumption_displaced.sql',
'08_instance/instance_waste_normal.sql',
'08_instance/instance_waste_displaced.sql',
'08_instance/instance_trade_normal.sql',
'08_instance/instance_trade_displaced.sql',
'08_instance/instance_waste_trade_normal.sql',
'08_instance/instance_waste_trade_displaced.sql'
]
class CheckMlConsumptionViewTask(tasks_sql.SqlCheckTask):
"""Check that the task-specific view for consumption prediction was created successfully."""
task_dir = luigi.Parameter(default=const.DEFAULT_TASK_DIR)
def requires(self):
"""Require that the task-specific views were built."""
return BuildMlViewsTask(task_dir=self.task_dir)
def output(self):
"""Report that the views were checked."""
out_path = os.path.join(self.task_dir, '202_check_consumption.json')
return luigi.LocalTarget(out_path)
def get_table_name(self):
"""Get the table where the view can be found."""
return 'instance_consumption_displaced'
class CheckMlWasteViewTask(tasks_sql.SqlCheckTask):
"""Check the EOL fate propensity view.
Check that the task-specific view for waste fate propensity prediction was created successfully.
"""
task_dir = luigi.Parameter(default=const.DEFAULT_TASK_DIR)
def requires(self):
"""Require that the task-specific views were built."""
return BuildMlViewsTask(task_dir=self.task_dir)
def output(self):
"""Report that the views were checked."""
out_path = os.path.join(self.task_dir, '203_check_waste.json')
return luigi.LocalTarget(out_path)
def get_table_name(self):
"""Get the table where the view can be found."""
return 'instance_waste_displaced'
class CheckMlTradeViewTask(tasks_sql.SqlCheckTask):
"""Check the goods and materials trade view.
Check that the task-specific view for goods and materials trade prediction was created
successfully.
"""
task_dir = luigi.Parameter(default=const.DEFAULT_TASK_DIR)
def requires(self):
"""Require that the task-specific views were built."""
return BuildMlViewsTask(task_dir=self.task_dir)
def output(self):
"""Report that the views were checked."""
out_path = os.path.join(self.task_dir, '204_check_trade.json')
return luigi.LocalTarget(out_path)
def get_table_name(self):
"""Get the table where the view can be found."""
return 'instance_trade_displaced'
class CheckMlWasteTradeViewTask(tasks_sql.SqlCheckTask):
"""Check that the task-specific view for waste trade prediction was created successfully."""
task_dir = luigi.Parameter(default=const.DEFAULT_TASK_DIR)
def requires(self):
"""Require that the task-specific views were built."""
return BuildMlViewsTask(task_dir=self.task_dir)
def output(self):
"""Report that the views were checked."""
out_path = os.path.join(self.task_dir, '205_check_trade.json')
return luigi.LocalTarget(out_path)
def get_table_name(self):
"""Get the table where the view can be found."""
return 'instance_waste_trade_displaced'