-
Notifications
You must be signed in to change notification settings - Fork 0
/
prediction_validation_insertion.py
120 lines (92 loc) · 4.91 KB
/
prediction_validation_insertion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
'''
This module handles validation, insertion into database, importing from database operations
for prediction files.
'''
import os
from application_logging import logger
from DataTypeValidation_Insertion_Prediction.DataTypeValidationPrediction import \
DbOperation
from Prediction_Raw_Data_Validation.predictionDataValidation import \
PredictionDataValidation
class PredictionValidation:
"""
This class validates the prediction files in the given directory.
"""
def __init__(self):
# first create the log directory before anything
if not os.path.isdir('Prediction_Logs'):
os.makedirs('Prediction_Logs')
self.log_path = "Prediction_Logs/Prediction_Main_Log.txt"
self.log_writer = logger.App_Logger()
def pred_validation(self, path: str):
"""
This method validates the files in path. According sorts the files in different directories.
"""
with open(self.log_path, 'a+', encoding='utf-8') as file_object:
try:
# check batch file
if not os.path.isdir(path):
error = NotADirectoryError(f"Bath directory not found at {path}")
self.log_writer.log(file_object, f"Error: {error}")
raise error
if not os.listdir(path):
error = FileNotFoundError(
"No prediction files found in Batch directory {path}.")
self.log_writer.log(file_object, f"Error: {error}")
raise error
self.log_writer.log(file_object, "Cleaning up old raw data directories")
validator = PredictionDataValidation(path)
validator.delete_existing_good_data_prediction_folder()
self.log_writer.log(file_object, "Good_Data folder deleted.")
# Move the bad files to archive folder
self.log_writer.log(file_object,
"Moving bad files to Archive and deleting Bad_Data folder.")
validator.move_bad_files_to_archive_bad()
validator.delete_existing_bad_data_prediction_folder()
self.log_writer.log(file_object,
"Bad files moved to archive!! Bad folder Deleted!!")
# begin validation part, create validator
self.log_writer.log(file_object, 'Start validation of files.')
self.log_writer.log(file_object, 'Getting values from schema file')
_, no_of_columns = validator.values_from_schema()
self.log_writer.log(file_object, 'Getting file name regex')
regex = validator.manual_regex_creation()
self.log_writer.log(file_object, 'Validating file name using regex')
validator.validation_filename_raw(regex)
self.log_writer.log(file_object, 'Validating number of columns')
validator.validate_column_length(no_of_columns)
self.log_writer.log(file_object, 'Validating if any column has all NULL values.')
validator.validate_missing_values_in_whole_column()
self.log_writer.log(file_object, "Raw Data Validation Complete!!")
except Exception as e:
self.log_writer.log(file_object, f"Error: {e}")
raise
def pred_insertion(self):
"""
This method inserts the validated data files into database tables.
"""
with open(self.log_path, 'a+', encoding='utf-8') as file_object:
try:
self.log_writer.log(file_object, "Starting database insertion.")
# create db operation instance
db_operator = DbOperation()
self.log_writer.log(file_object, "Created database operator.")
db_operator.insert_into_table_good_data('predictdb')
self.log_writer.log(file_object, "Data insertion completed")
except Exception as e:
self.log_writer.log(file_object, f"Error: {e}")
raise
def pred_fetch(self):
"""
This method fetches the prediction data set already stored in the database.
"""
with open(self.log_path, 'a+', encoding='utf-8') as file_object:
try:
self.log_writer.log(file_object, "Fetching data sets from database.")
db_operator = DbOperation()
self.log_writer.log(file_object, "Created database operator.")
db_operator.selecting_data_from_table_into_csv('predictdb', flush=False)
self.log_writer.log(file_object, "Data imported from database.")
except Exception as e:
self.log_writer.log(file_object, f"Error: {e}")
raise