-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclean_show_raw_data.py
123 lines (94 loc) · 3.78 KB
/
clean_show_raw_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import pandas as pd
import matplotlib.pyplot as plt
import math
import statistics
import numpy as np
def get_num_data(int_series):
int_series = list(int_series)
for i in range(len(int_series)):
try:
if not math.isnan(float(int_series[i])):
int_series[i] = float(int_series[i])
else:
raise ValueError
except ValueError:
int_series[i] = (float(int_series[i - 1]) + float(int_series[i - 2])) / 2
return int_series
def fix_oom(int_series):
int_series = list(int_series)
new_int_series = [statistics.median(int_series)]
prev_problem = False
prev_multiplier = 1
for i in range(len(int_series)):
curr_value = int_series[i]
prev_value = new_int_series[i]
curr_value = abs(curr_value)
if curr_value == 0:
new_value = 0.0
prev_problem = False
elif 0.1 < prev_value/curr_value < 10:
new_value = curr_value
prev_problem = False
else:
if prev_problem:
new_value = curr_value * prev_multiplier
else:
curr_oom = math.floor(math.log10(curr_value))
if prev_value == 0.0:
true_oom = -1
else:
true_oom = math.floor(math.log10(prev_value))
prev_multiplier = 10 ** (-1*curr_oom) * 10 ** true_oom
new_value = curr_value * prev_multiplier
prev_problem = True
new_int_series.append(new_value)
return new_int_series[1:]
def final_outlier_cleanup(int_series, sensitivity):
new_int_series = []
int_series = list(int_series)
mean = np.mean(int_series)
stddev = np.std(int_series)
outlier_count = 0
for i in range(len(int_series)):
curr_value = int_series[i]
if mean + sensitivity * stddev > curr_value > mean - sensitivity * stddev:
new_int_series.append(curr_value)
else:
new_int_series.append((new_int_series[i-1] + new_int_series[i-2]) / 2)
outlier_count += 1
print(f"No. of outliers: {outlier_count}")
return new_int_series
def read_and_clean_data(path_name, data_type, outlier_sensitivity=4):
data = pd.read_csv(path_name).drop(range(5))
data.columns = ["datetime", data_type]
# Check for NaNs
data[data_type] = get_num_data(data[data_type])
# Fix orders of magnitude
data[data_type] = fix_oom(data[data_type])
# Remove outliers
data[data_type] = final_outlier_cleanup(data[data_type], outlier_sensitivity)
# Datetime format
data["datetime"] = pd.to_datetime(data["datetime"])
return data
# /Users/athan/Documents/Wegaw/Aigen Lake Test Data/Snow-Depth-(cm).csv
# /Users/athan/Documents/Wegaw/Aigen Lake Test Data/Temperature (K).csv
# /Users/athan/Documents/Wegaw/Aigen Lake Test Data/sw (mm).csv
if __name__ == "__main__":
# snow_path_name = str(input("Path name for snow data: "))
snow_path_name = "/Users/athan/Documents/Wegaw/Aigen Lake Test Data/Snow-Depth-(cm).csv"
snow_data = read_and_clean_data(snow_path_name, "snow depth (cm)")
no_data_sets = int(input("How many more data sets? "))
for i in range(no_data_sets):
data_type = str(input("Data type (with units): "))
path_name = str(input("Path name: "))
data = read_and_clean_data(path_name, data_type)
snow_data = snow_data.merge(data)
print(snow_data)
if input("Download data? [y/n]: ") == "y":
snow_data.to_csv("/Users/athan/Documents/Wegaw/Aigen Lake Test Data/combined_data.csv", index=False)
# Plot
for i in range(1, len(snow_data.columns)):
plt.plot(snow_data["datetime"], snow_data[snow_data.columns[i]])
plt.xlabel("Time")
plt.legend(snow_data.columns[1:])
plt.show()