-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathapi.py
1025 lines (901 loc) · 53.6 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import copy
import datetime
import warnings
import numpy as np
import pandas as pd
import seaborn as sns
from pathlib import Path
from datetime import timedelta
from tqdm import tqdm
from operations.template import *
from sklearn.preprocessing import *
from matplotlib import pyplot as plt
from helpers.helper import connect_to_stardog
from sklearn.feature_selection import SelectKBest, f_classif
from operations.recommendation.recommender import Recommender
from kg_augmentor.augmentor import Governor
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)
import sys
class KGFarm:
def __init__(self, mode: str = 'Human in the loop', port: object = 5820, database: str = 'kgfarm_test',
show_connection_status: bool = True):
sys.path.insert(0, str(Path(os.getcwd()).parent.absolute()))
self.mode = mode
if mode not in ['Human in the loop', 'Automatic']:
raise ValueError("mode can be either 'Human in the Loop' or 'Automatic'")
print('(KGFarm is running in {} mode)'.format(mode))
self.config = connect_to_stardog(port, database, show_connection_status)
if mode == 'Human in the loop':
self.recommender = Recommender()
self.recommender_config = connect_to_stardog(port, db='kgfarm_recommender', show_status=False)
self.governor = Governor(self.config)
self.__table_transformations = {} # cols in enriched_df: tuple -> (entity_df_id, feature_view_id)
"""
conf = SparkConf().setAppName('KGFarm')
conf = (conf.setMaster('local[*]')
.set('spark.executor.memory', '10g')
.set('spark.driver.memory', '5g')
.set('spark.driver.maxResultSize', '5g'))
sc = SparkContext(conf=conf)
self.spark = SparkSession(sc)
"""
# re-arranging columns
@staticmethod
def __re_arrange_columns(last_column: str, df: pd.DataFrame):
features = list(df.columns)
features.remove(last_column)
features.append(last_column)
df = df[features]
return df
def __check_if_profiled(self, df: pd.DataFrame):
table_id = search_entity_table(self.config, list(df.columns))
if len(table_id) == 0:
# search for enriched tables
table_ids = self.__table_transformations.get(tuple(df.columns))
if table_ids is None:
return False # unseen table
else:
return table_ids # enriched table (return table urls which make the enriched table)
else:
return table_id['Table_id'][0] # seen / profiled table
# wrapper around pd.read_csv()
def load_table(self, table_info: pd.Series, print_table_name: bool = True):
table = table_info['Table']
dataset = table_info['Dataset']
if print_table_name:
print(table)
return pd.read_csv(get_table_path(self.config, table, dataset))
def get_entities(self, show_query: bool = False):
entity_df = get_entities(self.config, show_query)
entity_df['Entity_data_type'] = entity_df['Entity_data_type'].map(entity_data_types_mapping)
return entity_df
def get_feature_views(self, feature_view_type: str = 'all', message_status: bool = True, show_query: bool = False):
feature_view_df = get_feature_views_with_one_or_no_entity(self.config, show_query)
feature_view_df = feature_view_df.where(pd.notnull(feature_view_df), None)
feature_view_df.sort_values(by='Feature_view', inplace=True)
if feature_view_type == 'single':
if message_status:
print('Showing feature view(s) with single entity')
feature_view_df = feature_view_df.dropna() # remove feature with no entity
feature_view_df = feature_view_df.reset_index(drop=True)
return feature_view_df
feature_view_M = get_feature_views_with_multiple_entities(self.config, show_query)
# group entities together for feature view with multiple entities
"""
We need to do this because there is no direct/simple way to bind different entities to the associated feature
view, same goes for the physical column, to ease this, the python script below handles these cases.
"""
update_info = []
feature_view_dict = {}
feature_view_to_be_processed = None
for index, feature_view_info in feature_view_M.to_dict('index').items():
if feature_view_to_be_processed == feature_view_info['Feature_view']: # merge
entity_list = feature_view_dict.get('Entity')
entity_list.append(feature_view_info['Entity'])
feature_view_dict['Entity'] = entity_list
column_list = feature_view_dict.get('Physical_column')
column_list.append(feature_view_info['Physical_column'])
feature_view_dict['Physical_column'] = column_list
if index == len(feature_view_M) - 1: # last record
update_info.append(feature_view_dict)
else:
if feature_view_to_be_processed is None: # pass for first record
feature_view_to_be_processed = feature_view_info['Feature_view']
feature_view_dict['Feature_view'] = feature_view_info['Feature_view']
feature_view_dict['Entity'] = [feature_view_info['Entity']]
feature_view_dict['Physical_column'] = [feature_view_info['Physical_column']]
feature_view_dict['Physical_table'] = feature_view_info['Physical_table']
feature_view_dict['File_source'] = feature_view_info['File_source']
continue
update_info.append(feature_view_dict)
feature_view_dict = {}
feature_view_to_be_processed = feature_view_info['Feature_view']
feature_view_dict['Feature_view'] = feature_view_to_be_processed
feature_view_dict['Entity'] = [feature_view_info['Entity']]
feature_view_dict['Physical_column'] = [feature_view_info['Physical_column']]
feature_view_dict['Physical_table'] = feature_view_info['Physical_table']
feature_view_dict['File_source'] = feature_view_info['File_source']
if feature_view_type == 'multiple':
if message_status:
print('Showing feature view(s) with multiple entities')
return pd.DataFrame(update_info)
if feature_view_type == 'single and multiple':
if message_status:
print('Showing feature view(s) with single and multiple entities')
feature_view_df = feature_view_df.dropna() # remove feature with no entity
if feature_view_type == 'all':
if message_status:
print('Showing all feature views')
elif feature_view_type not in ['all', 'single', 'multiple', 'single and multiple']:
raise ValueError("feature_view_type must be 'single', 'multiple', 'single and multiple', or 'all'")
feature_view_df = pd.concat([feature_view_df, pd.DataFrame(update_info)], ignore_index=True)
feature_view_df = feature_view_df.reset_index(drop=True)
# add here
feature_view_df['Features'] = feature_view_df['Feature_view'].apply(
lambda x: get_features_in_feature_views(self.config, x, show_query))
return feature_view_df
def drop_feature_view(self, drop: list):
self.governor.drop_feature_view(drop)
return self.get_feature_views(message_status=False)
def get_optional_physical_representations(self, show_query: bool = False):
optional_physical_representations_df = get_optional_entities(self.config, show_query)
optional_physical_representations_df['Data_type'] = optional_physical_representations_df['Data_type']. \
map(entity_data_types_mapping)
return optional_physical_representations_df
def update_entity(self, entity_to_update_info: list):
self.governor.update_entity(entity_to_update_info)
return self.get_feature_views(message_status=False)
def identify_features(self, entity: str, target: str, show_query: bool = False):
feature_identification_info = identify_features(self.config, entity, target, show_query)
feature_identification_info['Features'] = feature_identification_info.apply(lambda x:
get_columns(self.config,
table=x.Physical_table,
dataset=x.Dataset),
axis=1)
for index, value in feature_identification_info.to_dict('index').items():
features = []
for feature_name in value['Features']:
if entity not in feature_name and target not in feature_name and feature_name != 'event_timestamp':
features.append(feature_name)
feature_identification_info.at[index, 'Features'] = features
return feature_identification_info[['Entity', 'Physical_representation', 'Features', 'Feature_view',
'Physical_table', 'Number_of_rows', 'File_source']]
def search_enrichment_options(self, entity_df: pd.DataFrame = None, show_query: bool = False):
# TODO: support for multiple entities.
enrichable_tables = search_enrichment_options(self.config, show_query)
# delete pairs where features are same i.e. nothing to join
for index, pairs in tqdm(enrichable_tables.to_dict('index').items()):
entity_dataset = pairs['Dataset']
entity_table = pairs['Table']
feature_view_dataset = pairs['Dataset_feature_view']
feature_view_table = pairs['Physical_joinable_table']
features_in_entity_df = get_columns(self.config, entity_table, entity_dataset)
features_in_feature_view = get_columns(self.config, feature_view_table, feature_view_dataset)
if set(features_in_feature_view).issubset(
set(features_in_entity_df)): # nothing to enrich as those features already exist
enrichable_tables = enrichable_tables.drop(index)
enrichable_tables = enrichable_tables.sort_values(by=['Table', 'Joinability_strength', 'Enrich_with'],
ascending=False).reset_index(drop=True)
enrichable_tables['Joinability_strength'] = enrichable_tables['Joinability_strength']. \
apply(lambda x: str(int(x * 100)) + '%')
if entity_df is not None:
# filter enrichable_tables dataframe based on columns in entity_df
if not len(search_entity_table(self.config, list(entity_df.columns))):
print('nothing to enrich')
return
entity_table = search_entity_table(self.config, list(entity_df.columns))['Table'][0]
enrichable_tables = enrichable_tables.loc[enrichable_tables['Table'] == entity_table]
enrichable_tables.drop(['Table', 'Table_path', 'Dataset'], axis=1, inplace=True)
# enrichable_tables.rename({'Dataset_feature_view': 'Dataset'}, axis=1, inplace=True)
enrichable_tables = enrichable_tables[['Enrich_with', 'Physical_joinable_table', 'Join_key',
'Joinability_strength', 'File_source', 'Dataset_feature_view']]. \
reset_index(drop=True)
return enrichable_tables
def get_features(self, enrichment_info: pd.Series, entity_df: pd.DataFrame = None, entity_df_columns: tuple = (),
show_status: bool = True):
# TODO: add support for fetching features that originate from multiple feature views at once.
feature_view = enrichment_info['Enrich_with']
if len(entity_df_columns) > 0: # process entity_df passed by the user
entity_df_features = entity_df_columns
else: # process the choice passed by the user from search_enrichment_options
entity_df_features = list(entity_df.columns)
# features in feature view table
feature_view_features = get_columns(self.config, enrichment_info['Physical_joinable_table'],
enrichment_info['Dataset_feature_view'])
# take difference
features = ['{}:'.format(feature_view) + feature for feature in feature_view_features if
feature not in entity_df_features]
if show_status:
print(len(features), 'feature(s) were found!')
return features
def recommend_data_transformations(self, entity_df: pd.DataFrame = None, show_query: bool = False,
show_insights: bool = True, n: int = None):
def get_transformation_technique(t, f_values):
if t == 'Ordinal encoding' and len(f_values) > 1:
return 'OrdinalEncoder'
elif t == 'Ordinal encoding' and len(f_values) == 1:
return 'LabelEncoder'
elif t == 'Scaling concerning outliers':
return 'RobustScaler'
elif t == 'Normalization':
return 'MinMaxScaler'
elif t == 'Scaling':
return 'StandardScaler'
elif t == 'Nominal encoding':
return 'OneHotEncoder'
elif t == 'Gaussian distribution':
return 'PowerTransformer'
elif t == 'LabelEncoder' and len(f_values) == 1:
return 'LabelEncoder'
elif t == 'OrdinalEncoder' and len(f_values) > 1:
return 'OrdinalEncoder'
# adds the transformation type mapping to the resultant recommendation dataframe
def add_transformation_type(df):
if df.empty:
print('\nno recommendations found, did you clean your data?\n'
'try using kgfarm.recommend_cleaning_operations()')
return
df['Transformation_type'] = df['Transformation']
df['Transformation'] = df.apply(lambda x: get_transformation_technique(x.Transformation, x.Feature), axis=1)
# TODO: fix this temporary hack
if None in list(df['Transformation']):
df['Transformation'] = df['Transformation_type']
for n_row, v in df.to_dict('index').items():
if v['Transformation'] == 'OrdinalEncoder' or v['Transformation'] == 'LabelEncoder':
df.loc[n_row, 'Transformation_type'] = 'Ordinal encoding'
elif v['Transformation'] == 'RobustEncoder':
df.loc[n_row, 'Transformation_type'] = 'Scaling concerning outliers'
elif v['Transformation'] == 'MinMaxScaler':
df.loc[n_row, 'Transformation_type'] = 'Normalization'
elif v['Transformation'] == 'StandardScaler':
df.loc[n_row, 'Transformation_type'] = 'Scaling'
elif v['Transformation'] == 'OneHotEncoding':
df.loc[n_row, 'Transformation_type'] = 'Nominal encoding'
elif v['Transformation'] == 'PowerTransformer':
df.loc[n_row, 'Transformation_type'] = 'Gaussian distribution'
# post-processing to reduce false positives
for n_row, v in df.to_dict('index').items():
features_to_be_encoded = []
if v['Transformation'] == 'OneHotEncoder':
for f in v['Feature']:
if len(entity_df[f].value_counts()) <= 5:
features_to_be_encoded.append(f)
if len(features_to_be_encoded) == 0:
df.drop(index=n_row, inplace=True)
else:
df.loc[n_row, ['Feature']] = [features_to_be_encoded]
return df
def handle_unseen_data(n_samples):
if n_samples is None or n_samples > len(entity_df):
n_samples = len(entity_df)
return add_transformation_type(
self.recommender.get_transformation_recommendations(entity_df.sample(n_samples),
show_insight=show_insights))
transformation_info = recommend_feature_transformations(self.config, show_query)
# group features together per transformation
transformation_info_grouped = []
feature = []
pipeline = None
transformation = None
# TODO: test the grouping script (especially for the transformation on single column)
for row_number, value in transformation_info.to_dict('index').items():
if transformation == value['Transformation'] and pipeline == value['Pipeline']:
feature.append(value['Feature'])
if row_number == len(transformation_info) - 1: # last row
row = transformation_info.to_dict('index').get(row_number - 1)
transformation_info_grouped.append({'Transformation': transformation,
'Package': row['Package'],
'Function': row['Function'],
'Library': row['Library'],
'Feature': feature,
'Feature_view': row['Feature_view'],
'Table': row['Table'],
'Dataset': row['Dataset'],
'Author': row['Author'],
'Written_on': row['Written_on'],
'Pipeline': pipeline,
'Pipeline_url': row['Pipeline_url']})
else:
if row_number == 0:
transformation = value['Transformation']
pipeline = value['Pipeline']
feature = [value['Feature']]
continue
row = transformation_info.to_dict('index').get(row_number - 1)
transformation_info_grouped.append({'Transformation': transformation,
'Package': row['Package'],
'Function': row['Function'],
'Library': row['Library'],
'Feature': feature,
'Feature_view': row['Feature_view'],
'Table': row['Table'],
'Dataset': row['Dataset'],
'Author': row['Author'],
'Written_on': row['Written_on'],
'Pipeline': pipeline,
'Pipeline_url': row['Pipeline_url']})
transformation = value['Transformation']
pipeline = value['Pipeline']
feature = [value['Feature']]
transformation_info = pd.DataFrame(transformation_info_grouped)
if {'Package', 'Function', 'Library', 'Author', 'Written_on', 'Pipeline_url'}.issubset(
set(transformation_info.columns)):
transformation_info.drop(['Package', 'Function', 'Library', 'Author', 'Written_on', 'Pipeline_url'],
axis=1, inplace=True)
if entity_df is not None:
table_ids = self.__table_transformations.get(tuple(entity_df.columns))
if not table_ids:
print('Processing unseen data')
return handle_unseen_data(n_samples=n)
tables = list(map(lambda x: get_table_name(self.config, table_id=x), table_ids))
# filtering transformations w.r.t entity_df
for index, value in tqdm(transformation_info.to_dict('index').items()):
if value['Table'] not in tables:
transformation_info.drop(index=index, axis=0, inplace=True)
if len(transformation_info) < 1:
return transformation_info
transformation_info = transformation_info.reset_index(drop=True)
transformation_info.drop(['Dataset', 'Dataset', 'Table'],
axis=1, inplace=True)
recommended_transformation = add_transformation_type(transformation_info)
for index, value in recommended_transformation.to_dict('index').items():
if value['Transformation_type'] == 'Ordinal encoding' and len(value['Feature']) > 1:
recommended_transformation.at[index, 'Transformation'] = 'OrdinalEncoder'
return recommended_transformation
def apply_data_transformation(self, transformation_info: pd.Series, entity_df: pd.DataFrame = None,
output_message: str = None):
# TODO: add support for PowerTransformer
if entity_df is not None: # apply transformations directly on entity_df passed by user
df = entity_df
else: # load the table from the choice/row passed by the user from recommend_feature_transformations()
df = self.load_table(transformation_info, print_table_name=False)
transformation = transformation_info['Transformation']
features = transformation_info['Feature']
if transformation == 'LabelEncoder':
print('Applying LabelEncoder transformation')
transformation_model = LabelEncoder()
# label encoding is applied on single feature
df[features[0]] = transformation_model.fit_transform(df[features[0]])
elif transformation == 'StandardScaler':
# print(
# 'CAUTION: Make sure you apply {} transformation only on the train set (This ensures there is no over-fitting due to feature leakage)\n'.format(
# transformation) + \
# 'Use the transformation_model returned from this api to transform test set independently.\n')
print('Applying StandardScaler transformation')
transformation_model = StandardScaler(copy=False)
df[features] = transformation_model.fit_transform(df[features])
elif transformation == 'OrdinalEncoder':
print('Applying OrdinalEncoder transformation')
transformation_model = OrdinalEncoder()
df[features] = transformation_model.fit_transform(df[features])
elif transformation == 'MinMaxScaler':
print('Applying MinMaxScaler transformation')
transformation_model = MinMaxScaler()
df[features] = transformation_model.fit_transform(df[features])
elif transformation == 'OneHotEncoder':
print('Applying OneHotEncoder transformation')
transformation_model = OneHotEncoder(handle_unknown='ignore')
one_hot_encoded_features = pd.DataFrame(transformation_model.fit_transform(df[features]).toarray())
df = df.join(one_hot_encoded_features)
df = df.drop(features, axis=1)
elif transformation == 'RobustScaler':
print('Applying RobustScalar transformation')
transformation_model = RobustScaler()
df[features] = transformation_model.fit_transform(df[features])
else:
print(transformation, 'not supported yet!')
return
if output_message is None:
print('{} feature(s) {} transformed successfully!'.format(len(features), features))
else:
print('{} feature(s) were transformed successfully!'.format(len(features)))
return df, transformation_model
def recommend_transformations(self, X: pd.DataFrame):
return self.recommender.recommend_transformations(X=X)
@staticmethod
def apply_transformations(X: pd.DataFrame, recommendation: pd.Series):
transformation = recommendation['Recommended_transformation']
feature = recommendation['Feature']
if transformation in {'StandardScaler', 'MinMaxScaler', 'RobustScaler', 'QuantileTransformer',
'PowerTransformer'}:
print(f'Applying {transformation} on {list(X.columns)}')
if transformation == 'StandardScaler':
scaler = StandardScaler()
elif transformation == 'MinMaxScaler':
scaler = MinMaxScaler()
elif transformation == 'RobustScaler':
scaler = RobustScaler()
elif transformation == 'QuantileTransformer':
scaler = QuantileTransformer()
else:
scaler = PowerTransformer()
X[X.columns] = scaler.fit_transform(X=X[X.columns])
return X, scaler
elif transformation in {'Log', 'Sqrt', 'square'}:
print(f'Applying {transformation} on {list(feature)}')
if transformation == 'Log':
def log_plus_const(x, const=0):
return np.log(x + np.abs(const) + 0.0001)
for f in tqdm(feature):
min_neg_val = X[f].min()
unary_transformation_model = FunctionTransformer(func=log_plus_const,
kw_args={'const': min_neg_val}, validate=True)
X[f] = unary_transformation_model.fit_transform(X=np.array(X[f]).reshape(-1, 1))
elif transformation == 'Sqrt':
def sqrt_plus_const(x, const=0):
return np.sqrt(x + np.abs(const) + 0.0001)
for f in tqdm(feature):
min_neg_val = X[f].min()
unary_transformation_model = FunctionTransformer(func=sqrt_plus_const,
kw_args={'const': min_neg_val}, validate=True)
X[f] = unary_transformation_model.fit_transform(X=np.array(X[f]).reshape(-1, 1))
else:
unary_transformation_model = FunctionTransformer(func=np.square, validate=True)
X[feature] = unary_transformation_model.fit_transform(X=X[feature])
return X, transformation
elif transformation in {'OrdinalEncoder', 'OneHotEncoder'}:
print(f'Applying {transformation} on {list(feature)}')
if transformation == 'OrdinalEncoder':
encoder = OrdinalEncoder()
X[feature] = encoder.fit_transform(X=X[feature])
else:
encoder = OneHotEncoder(handle_unknown='ignore')
one_hot_encoded_features = pd.DataFrame(encoder.fit_transform(X[feature]).toarray())
X = X.join(one_hot_encoded_features)
X = X.drop(feature, axis=1)
return X, encoder
else:
raise ValueError(f'{transformation} not supported')
# unary_categorical_transformations = recommendations[recommendations.loc['Transformation_type'] in {'ordinal encoding', 'nominal encoding'}]
# unary_numerical_transformations = recommendations[recommendations.loc['Transformation_type'] == 'unary transformation']
# scaling_transformation = recommendations[recommendations.loc['Transformation_type'] == 'scaling']
def enrich(self, enrichment_info: pd.Series, entity_df: pd.DataFrame = None, freshness: int = 10):
if entity_df is not None: # entity_df passed by the user
# get features to be enriched with
features = self.get_features(enrichment_info=enrichment_info, entity_df_columns=tuple(entity_df.columns),
show_status=False)
features = [feature.split(':')[-1] for feature in features]
print('Enriching {} with {} feature(s) {}'.format('entity_df', len(features), features))
else: # option selected from search_enrichment_options()
entity_df = pd.read_csv(enrichment_info['Table_path'])
features = self.get_features(enrichment_info=enrichment_info, entity_df=entity_df, show_status=False)
features = [feature.split(':')[-1] for feature in features]
print('Enriching {} with {} feature(s) {}'.format(enrichment_info['Table'], len(features), features))
source_table_id = search_entity_table(self.config, entity_df.columns)['Table_id'][
0] # needed to track tables after enrichment
# parse row passed as the input
feature_view = pd.read_csv(enrichment_info['File_source'])
join_jey = enrichment_info['Join_key']
last_column = list(entity_df.columns)[-1] # for re-arranging column
# add timestamp and join-key to features in feature view to perform join
features.extend([join_jey, 'event_timestamp'])
feature_view = feature_view[features]
enriched_df = pd.merge(entity_df, feature_view, on=join_jey)
for row, row_info in tqdm(enriched_df.to_dict('index').items()):
timestamp_entity = datetime.datetime.strptime(row_info['event_timestamp_x'], '%Y-%m-%d %H:%M:%S.%f')
timestamp_feature_view = datetime.datetime.strptime(row_info['event_timestamp_y'], '%Y-%m-%d %H:%M:%S.%f')
"""
delete record if the following either of the following 2 conditions were violated:
1. Timestamp of entity < Timestamp of feature view or
2. Timestamp of entity - freshness > timestamp of feature view
"""
if timestamp_entity < timestamp_feature_view or timestamp_entity - timedelta(
days=freshness) > timestamp_feature_view:
enriched_df.drop(index=row, axis=0, inplace=True)
enriched_df.drop('event_timestamp_y', axis=1, inplace=True)
enriched_df.rename(columns={'event_timestamp_x': 'event_timestamp'}, inplace=True)
# re-arrange columns
columns = list(enriched_df.columns)
columns.remove('event_timestamp')
columns.insert(0, 'event_timestamp')
enriched_df = enriched_df[columns]
enriched_df = enriched_df.sort_values(by=join_jey).reset_index(drop=True)
enriched_df = self.__re_arrange_columns(last_column, enriched_df)
# maintain enrichment details (columns in enriched dataset : (table_ids of the joined tables))
self.__table_transformations[tuple(enriched_df.columns)] = (source_table_id,
get_physical_table(self.config,
feature_view=enrichment_info[
'Enrich_with']))
return enriched_df
def __get_features(self, entity_df: pd.DataFrame, filtered_columns: list, show_query: bool = False):
table_id = search_entity_table(self.config, list(entity_df.columns))
if len(table_id) < 1:
print('Searching features for enriched dataframe\n')
table_ids = self.__table_transformations.get(tuple(entity_df.columns))
return [feature for feature in list(entity_df.columns) if feature not in
get_features_to_drop(self.config, table_ids[0], show_query)[
'Feature_to_drop'].tolist() and feature not in
get_features_to_drop(self.config, table_ids[1], show_query)[
'Feature_to_drop'].tolist() and feature in
filtered_columns]
else:
table_id = table_id['Table_id'][0]
return [feature for feature in list(entity_df.columns)
if
feature not in get_features_to_drop(self.config, table_id, show_query)['Feature_to_drop'].tolist()
and feature in filtered_columns]
def select_features(self, entity_df: pd.DataFrame, dependent_variable: str, select_by: str = None,
plot_correlation: bool = True,
plot_anova_test: bool = True, show_f_value: bool = False):
def handle_unseen_data():
raise NotImplementedError('under construction')
def handle_data_by_statistics(dependent_var, f_score):
def get_input():
return int(input(f'Enter k (where k is the top-k ranked features out of {len(df.columns)} feature(s) '))
if select_by not in {'anova', 'correlation'}:
raise ValueError("select_by can either be 'anova' or 'correlation'")
if select_by == 'anova':
f_score = f_score.head(get_input())
independent_var = df[f_score['Feature']] # features (X)
print('Top {} feature(s) {} were selected based on highest F-value'.
format(len(independent_var.columns), list(independent_var.columns)))
return independent_var, dependent_var
elif select_by == 'correlation':
correlation = df.corr()
correlation.drop(index=dependent_variable, inplace=True)
columns = list(correlation.columns)
columns = [f for f in columns if f != dependent_variable]
correlation.drop(columns, axis=1, inplace=True)
correlation.sort_values(by=dependent_variable, ascending=False, inplace=True)
top_k_features = correlation.head(get_input())
top_k_features = list(top_k_features.to_dict().get(dependent_variable))
independent_var = df[top_k_features] # features (X)
if self.mode != 'Automatic':
print('Top {} feature(s) {} were selected based on highest Correlation'.
format(len(independent_var.columns), list(independent_var.columns)))
return independent_var, dependent_var
df = copy.copy(entity_df)
for feature in tqdm(list(entity_df.columns)): # drop entity column and timestamp
if is_entity_column(self.config, feature=feature, dependent_variable=dependent_variable) \
or feature == 'event_timestamp':
df.drop(feature, axis=1, inplace=True)
print('Analyzing features')
if plot_correlation: # plot pearson correlation
plt.rcParams['figure.dpi'] = 300
corr = df.corr(method='pearson')
plt.figure(figsize=(15, 10))
sns.heatmap(corr, annot=True, cmap='Greens')
plt.show()
# calculate F-value for features
y = entity_df[dependent_variable] # dependent variable
X = df.drop(dependent_variable, axis=1) # independent variables
best_features = SelectKBest(score_func=f_classif, k=5).fit(X, y)
scores = pd.DataFrame(best_features.scores_)
features = pd.DataFrame(X.columns)
feature_scores = pd.concat([scores, features], axis=1)
feature_scores.columns = ['F_value', 'Feature']
feature_scores['F_value'] = feature_scores['F_value'].apply(lambda x: round(x, 2))
feature_scores = feature_scores.sort_values(by='F_value', ascending=False).reset_index(drop=True)
if plot_anova_test: # plot ANOVA test graph
plt.rcParams['figure.dpi'] = 300
plt.figure(figsize=(15, 10))
sns.set_color_codes('pastel')
sns.barplot(x='F_value', y='Feature', data=feature_scores,
label='Total', palette="Greens_r", edgecolor='none')
sns.set_color_codes('muted')
sns.despine(left=True, bottom=True)
plt.xlabel('F value', fontsize=15)
plt.ylabel('Feature', fontsize=15)
plt.grid(color='lightgray', axis='y')
plt.tick_params(axis='both', which='major', labelsize=15)
plt.show()
if show_f_value:
print(feature_scores, '\n')
table_id = search_entity_table(self.config, list(entity_df.columns))
if len(table_id) < 1: # i.e. table not profiled
table_ids = self.__table_transformations.get(tuple(entity_df.columns))
if table_ids is None:
# return handle_data_by_statistics(dependent_var=y, f_score=feature_scores)
if select_by in {'anova', 'correlation'}:
return handle_data_by_statistics(dependent_var=y, f_score=feature_scores)
else:
print('processing unseen data')
handle_unseen_data()
if select_by is None: # select by pipelines
X = entity_df[self.__get_features(entity_df=entity_df, filtered_columns=list(df.columns))]
print('{} feature(s) {} were selected based on previously abstracted pipelines'.format(len(X.columns),
list(X.columns)))
return X, y
@staticmethod
def get_columns_to_be_cleaned(df: pd.DataFrame):
for na_type in {'none', 'n/a', 'na', 'nan', 'missing', '?', '', ' '}:
if na_type in {'?', '', ' '}:
df.replace(na_type, np.nan, inplace=True)
else:
df.replace('(?i)' + na_type, np.nan, inplace=True, regex=True)
columns = pd.DataFrame(df.isnull().sum())
columns.columns = ['Missing values']
columns['Feature'] = columns.index
columns = columns[columns['Missing values'] > 0]
columns.sort_values(by='Missing values', ascending=False, inplace=True)
columns.reset_index(drop=True, inplace=True)
return columns
def recommend_cleaning_operations(self, entity_df: pd.DataFrame, visualize_missing_data: bool = True,
show_query: bool = False):
"""
1. visualize missing data
2. check if data is profiled or unseen
3. if unseen align and query else query directly
"""
def plot_heat_map(df: pd.DataFrame):
plt.rcParams['figure.dpi'] = 300
plt.figure(figsize=(15, 7))
sns.heatmap(df.isnull(), yticklabels=False, cmap='Greens_r')
plt.show()
def plot_bar_graph(columns: pd.DataFrame):
if len(columns) == 0:
return
sns.set_color_codes('pastel')
plt.rcParams['figure.dpi'] = 300
plt.figure(figsize=(6, 3))
ax = sns.barplot(x="Feature", y="Missing values", data=columns,
palette='Greens_r', edgecolor='gainsboro')
ax.bar_label(ax.containers[0], fontsize=6)
def change_width(axis, new_value):
for patch in axis.patches:
current_width = patch.get_width()
diff = current_width - new_value
patch.set_width(new_value)
patch.set_x(patch.get_x() + diff * .5)
change_width(ax, .20)
plt.grid(color='lightgray', axis='y')
plt.ylabel('Missing value', fontsize=5.5)
plt.xlabel('')
ax.tick_params(axis='both', which='major', labelsize=5.5)
ax.tick_params(axis='x', labelrotation=90, labelsize=5.5)
plt.show()
"""
def recommend_cleaning_operations_for_unseen_data(list_of_similar_tables: list, display: bool):
recommendations = []
for recommended_table in tqdm(list_of_similar_tables):
try:
recommendations.append(get_data_cleaning_recommendation(self.recommender_config,
table_id=recommended_table,
show_query=display))
except StardogException:
continue
display = False
if len(recommendations) == 0:
return False
return pd.concat(recommendations).reset_index(drop=True)
"""
# TODO: fix and test formatting for interpolation
"""
def reformat_recommendations(df: pd.DataFrame):
def format_columns(c: str):
if isinstance(c, str):
return urllib.parse.unquote_plus(c).split('/')[-1]
else:
return list(columns_to_be_cleaned['Feature'])
df['Feature'] = df['Column_id'].apply(lambda x: format_columns(x))
df = df.drop('Column_id', axis=1).dropna(how='any').reset_index(drop=True)
pipeline = None
operation = None
parameters_per_pipeline = {}
params = {}
features = []
for row_number, recommendation_info in df.to_dict('index').items():
if isinstance(recommendation_info['Feature'], list):
continue
if pipeline == recommendation_info['Pipeline'] and operation == data_cleaning_operation_mapping.get(
recommendation_info['Function']):
params.update({recommendation_info['Feature']: recommendation_info['Value']})
features.append(recommendation_info['Feature'])
if row_number == len(df) - 1: # save for last record
parameters_per_pipeline[pipeline] = [operation, features, params]
else:
if row_number == 0: # first pipeline
pipeline = recommendation_info['Pipeline']
operation = data_cleaning_operation_mapping.get(recommendation_info['Function'])
features = [recommendation_info['Feature']]
params.update({recommendation_info['Feature']: recommendation_info['Value']})
continue
if row_number == len(df) - 1: # save for last record
parameters_per_pipeline[pipeline] = [operation, features, params]
else:
# save current pipeline_info
parameters_per_pipeline[pipeline] = [operation, features, params]
# update new pipeline
pipeline = recommendation_info['Pipeline']
operation = data_cleaning_operation_mapping.get(recommendation_info['Function'])
features = [recommendation_info['Feature']]
params = {recommendation_info['Feature']: recommendation_info['Value']}
operation = []
params = []
features = []
for row in list(parameters_per_pipeline.values()):
operation.append(row[0])
features.append(row[1])
params.append(row[2])
df = pd.DataFrame({'Operation': operation,
'Feature': features,
'Parameters': params,
'Pipeline': list(parameters_per_pipeline.keys())})
# check if features are present in actual entity_df
updated_params = []
for row_number, recommendation_info in df.to_dict('index').items():
features = recommendation_info['Feature']
parameters = recommendation_info['Parameters']
if '<src.Calls' in parameters:
df.drop(index=row_number, axis=1, inplace=True)
continue
flag = True
params = {}
for f in features:
if f.replace(' ', '') in entity_df.columns:
features[features.index(f)] = f.replace(' ', '')
value = parameters.get(f)
params.update({f.replace(' ', ''): value})
else:
flag = False
if not flag:
df.drop(index=row_number, axis=1, inplace=True)
continue
updated_params.append(params)
df['Parameters'] = updated_params
return df.dropna(how='any').reset_index(drop=True)
"""
print('scanning missing values')
columns_to_be_cleaned = self.get_columns_to_be_cleaned(entity_df)
if visualize_missing_data:
plot_heat_map(df=entity_df)
plot_bar_graph(columns=columns_to_be_cleaned)
if len(columns_to_be_cleaned) == 0:
print('nothing to clean')
return entity_df
table_id = self.__check_if_profiled(df=entity_df)
if table_id is not False: # seen data
if isinstance(table_id, tuple):
recommendations_for_enriched_tables = []
for ids in table_id:
recommendations_for_enriched_tables.append(get_data_cleaning_recommendation(self.config,
table_id=ids,
show_query=show_query))
data_cleaning_info = pd.concat(recommendations_for_enriched_tables)
else:
data_cleaning_info = get_data_cleaning_recommendation(self.config, table_id=table_id,
show_query=show_query)
# reformat seen cleaning info
data_cleaning_info['Parameters'] = data_cleaning_info.apply(lambda x: {x.Parameter: x.Value}, axis=1)
data_cleaning_info['Operation'] = data_cleaning_info['Function'].apply(
lambda x: data_cleaning_operation_mapping.get(x))
data_cleaning_info['Feature'] = data_cleaning_info['Column_id'].apply(
lambda x: list(columns_to_be_cleaned['Feature']))
data_cleaning_info = data_cleaning_info[['Operation', 'Feature', 'Parameters', 'Pipeline']]
return data_cleaning_info
elif table_id is False: # unseen data
print('finding similar columns and tables to entity dataframe')
similar_tables = self.recommender.get_cleaning_recommendation(
entity_df[columns_to_be_cleaned['Feature']]) # align
print('Suggestions are:', similar_tables)
return similar_tables
# if len(similar_tables) < top_k:
# top_k = len(similar_tables)
#
# similar_tables = similar_tables[:top_k]
#
# if len(similar_tables) == 0:
# print('no recommendations, try using kgfarm.clean()')
# return
#
# raw_recommendations = recommend_cleaning_operations_for_unseen_data(list_of_similar_tables=similar_tables,
# display=show_query) # query
#
# if raw_recommendations is False or raw_recommendations.empty:
# print('no recommendations, try using kgfarm.clean()')
# return
# else:
# cleaning_recommendations = reformat_recommendations(df=raw_recommendations)
# if cleaning_recommendations.empty:
# print('no recommendations, try using kgfarm.clean()')
# return
# else:
# return cleaning_recommendations
def clean(self, entity_df: pd.DataFrame, cleaning_info: pd.Series = None, technique: str = None):
"""
cleans entity_df from info coming from kgfarm.recommend_cleaning_operations
"""
def check_for_uncleaned_features(df: pd.DataFrame): # clean by recommendations
uncleaned_features = list(self.get_columns_to_be_cleaned(df=df)['Feature'])
if len(uncleaned_features) == 0:
print('\nall features look clean')
else:
print(f'\n{uncleaned_features} are still uncleaned')
if cleaning_info is not None:
if cleaning_info['Operation'] == 'Fill missing values':
entity_df.fillna(cleaning_info['Parameters'], inplace=True)
print(f'filled missing values for {cleaning_info["Feature"]} feature(s)')
elif cleaning_info['Operation'] == 'Interpolate':
params = cleaning_info['Parameters']
method = params.get('method')
print(f'interpolated missing values for {cleaning_info["Feature"]} feature(s)')
entity_df.interpolate(method=method, inplace=True)
else:
features = list(cleaning_info['Feature'])
entity_df.dropna(subset=features, how='any', inplace=True)
print(f'dropped missing values for {cleaning_info["Feature"]} feature(s)')
check_for_uncleaned_features(df=entity_df)
return entity_df
elif technique is not None: # clean by human-in-the-loop
columns = list(self.get_columns_to_be_cleaned(df=entity_df)['Feature'])
if technique == 'drop':
entity_df.dropna(how='any', inplace=True)
entity_df.reset_index(drop=True, inplace=True)
print(f'missing values from {columns} were dropped')
check_for_uncleaned_features(df=entity_df)
return entity_df
elif technique == 'fill' or technique == 'quick clean':
def get_mode(feature: pd.Series): # fill categorical data with mode
return feature.mode()[0]
if technique == 'quick clean':
for column in tqdm(columns):
mode = get_mode(entity_df[column])
entity_df[column].fillna(mode, inplace=True)
return entity_df
fill_value = input(
"Enter the value to fill the missing data or 'mean', 'median', 'mode' to fill by statistics")
if fill_value not in {'mean', 'median', 'mode'}: # fill constant value
entity_df.fillna(fill_value, inplace=True)
else:
if fill_value == 'median':
entity_df.fillna(entity_df.median(), inplace=True)
elif fill_value == 'mean':
entity_df.fillna(entity_df.mean(), inplace=True)
else:
for column in tqdm(columns):
mode = get_mode(entity_df[column])
entity_df[column].fillna(mode, inplace=True)
entity_df.reset_index(drop=True, inplace=True)
print(f'missing values from {columns} were filled with {fill_value}')
check_for_uncleaned_features(df=entity_df)
return entity_df
elif technique == 'interpolate':
try:
entity_df.interpolate(inplace=True)
entity_df.reset_index(drop=True, inplace=True)
print(f'missing values from {columns} were interpolated')
except TypeError:
print('only numerical features can be interpolated')
check_for_uncleaned_features(df=entity_df)
return entity_df
else:
if technique not in {'drop', 'fill', 'interpolate'}:
raise ValueError("technique must be one out of 'drop', 'fill' or 'interpolate'")
"""
def recommend_features_to_be_selected(self, entity_df: pd.DataFrame, dependent_variable: str, k: int):
recommended_features = list(self.recommender.get_feature_selection_score(entity_df=entity_df,
dependent_variable=dependent_variable).head(
k)['Feature'])
# print(f'Recommending top-{k} feature(s) {recommended_features}')
return entity_df[recommended_features], entity_df[dependent_variable] # return X, y
"""
def recommend_features_to_be_selected(self, task: str, X: pd.DataFrame, y: pd.Series,
k: int = None):
entity_df = pd.concat([X, y], axis=1)
if k is None or len(entity_df) < k:
k = len(entity_df)
return self.recommender.get_feature_selection_score(task=task, entity_df=entity_df.sample(n=k, random_state=1),
dependent_variable=str(y.name))