-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathexp-02-configuration-based-transforms.py
92 lines (71 loc) · 3.17 KB
/
exp-02-configuration-based-transforms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# Aim of this experiment is to see if we can do configuration based transformation of data
# For example if we had a configuration file that had the following lines in it:
# SEX, ->, UPM.Sex
# TITLE, ->, UPM.Title
# In the example above -> means "maps to" and this will look for files called SEX_MAP.csv and TITLE_MAP.csv
# it will then use the values in these files to map values from the SOURCE to the DESTINATION
import pandas as pd
import os
import csv
def load_etl_configuration(configPath):
config = []
with open(configPath) as csvfile:
reader = csv.reader(csvfile, delimiter=',')
for row in reader:
config.append(row)
# pd.DataFrame(data=config[1:, 1:], columns=config[0, 1:])
config_df = pd.DataFrame.from_records(config)
new_df = pd.DataFrame(config_df.values[1:], columns=config_df.iloc[0])
return new_df
def csv_to_dictionary(csvPath):
with open(csvPath, mode='r') as infile:
reader = csv.reader(infile)
map = {rows[0]: rows[1] for rows in reader}
return map
def load_transformation_configuration(configPath):
transforms = []
with open(configPath) as csvfile:
reader = csv.reader(csvfile, delimiter=',')
for row in reader:
transforms.append(row)
return transforms
def filter_maps_from_config(config):
mapFilter = filter(lambda x: x[1] == '->', config)
maps = []
for row in mapFilter:
maps.append(row[0])
return maps
def get_lookups_from_transforms(transforms):
filtered = filter_maps_from_config(transforms)
maps = {rows: csv_to_dictionary(os.path.join(
os.getcwd(), 'Maps', rows + '_MAP.csv')) for rows in filtered}
return maps
def apply_lookup(key, lookup_dictionary):
# we return either the value assigned to the key,
# or the default value as defined in the mapping file
return lookup_dictionary.get(key, lookup_dictionary.get("*"))
def get_destination_column_name(source_column_name, config):
for setting in config:
if setting[0] == source_column_name:
return setting[2]
ETL_CONFIG_PATH = os.path.join(os.getcwd(), 'CONFIG', 'ETL.csv')
etl_config = load_etl_configuration(ETL_CONFIG_PATH)
for index, row in etl_config.iterrows():
#print(row['DATA'], row['CONFIG'])
DATA_PATH = os.path.join(os.getcwd(), row['DATA'])
CONFIG_PATH = os.path.join(os.getcwd(), row['CONFIG'])
transforms = load_transformation_configuration(CONFIG_PATH)
lookups = get_lookups_from_transforms(transforms)
# We also want to skip the first row, as this contains junk
df = pd.read_csv(DATA_PATH, skiprows=[1])
df2 = pd.DataFrame()
for key in lookups:
destination_column = get_destination_column_name(key, transforms)
df2[destination_column] = df[key].apply(apply_lookup, lookup_dictionary=lookups[key])
# next, apply copy / filter / ignore transforms
# If you want to see the entire dataframe
# with pd.option_context('display.max_rows', None, 'display.max_columns', None):
# print(df2)
output_file_name = (row['DATA'].split("\\")[-1].split(".")[0])
print(output_file_name)
df2.to_pickle(os.path.join(os.getcwd(), 'output', output_file_name + '.pickle'))