-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
205 lines (157 loc) · 6.79 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# -*- coding: utf-8 -*-
# ------------------------------------------------
# External imports
# ------------------------------------------------
import requests
# ------------------------------------------------
# Python Imports
# ------------------------------------------------
import json
import csv
# ------------------------------------------------
# Module Imports
# ------------------------------------------------
from errors.v1.handlers import ApiError
from config.v1.app_config import SMTP
import smtplib
import ssl
def read_json_file(json_file):
# Opening JSON file
with open(json_file) as jf:
# returns JSON object as
# a dictionary
data = json.load(jf)
return data
def options_filter(data, options):
"""
Filters through a list of dictionaries or a single dictionary and removes any data from the options dict that is set to false
:param data: maybe a list of dicts or a single dict
:param options: The options to filter on
:return: Filtered data
"""
# Define an empty list to hold all our filtered dictionaries
fl = []
def filter_options(data_set, options):
new_dict = {}
for k, _ in data_set.items():
# The following line is a dictionary comprehension. It is used to filter optional data specified in the kwargs argument.
# which is passed into the API by the client request as a Json dictionary of options.
# The way it works is to filter key-value pairs from the returned film_entity dictionary against the kwargs dictionary.
# Any key-value pair in the film_entity dictionary that is in the options dictionary of kwargs and set to False should be excluded from the returned data.
filtered_dict = {k: v for (k, v) in data_set.items() if k not in options or options[k] is True}
new_dict.update(filtered_dict)
return new_dict
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
fd = filter_options(item, options)
fl.append(fd)
elif isinstance(data, dict):
fd = filter_options(data, options)
fl.append(fd)
else:
return data
return fl
def dict_sort(data, sort_by: str, sort_order: str):
"""
param: data: The data to sort
param: sort_by: A string to sort by
param: sort_order: Ascending or Descending order - asc or desc
returns: sorted data by key sort_by
"""
# See below for the use of this flag
int_convert = False
# TODO - LET'S LEAVE THIS LINE COMMENTED AND LET STUDENTS SEE THAT THE HEIGHT IS NOT SORTED CORRECTLY AS A STRING THEN APPLY THE FOLLOWING LINE
# TODO TRYING TO FIGURE OUT NICE COMPREHENSION FOR ABOVE cd = [i for i in data if i.get(sort_by) and int(i[sort_by])]
if data[0][sort_by].isdigit():
for d in data:
d[sort_by] = int(d[sort_by])
int_convert = True
# Using a lambda for the sort routine - For every item in data lambda d applies the sort_by string in a default of ascending order
sorted_list = sorted(data, key=lambda x: x[sort_by])
if sort_order == "desc":
sorted_list.reverse()
# If we converted the sort values to integers, we need to convert them back here.
# WHY - well because the API spec states that these should be returned as strings.
# They are received as strings from the data source and as such are specified in the API as strings
# Therefore anyone reading the api assumes that they are returned as strings and not integers.
# WHY do we not convert all stringed numbers to integers, well that could take a lot of time. If it was our
# own data source we would store them as integers in the first place.
if int_convert:
for d in data:
d[sort_by] = str(d[sort_by])
return sorted_list
def dict_excludes(data: dict, excludes: list) -> dict:
"""
Exclude a set of keys from a dictionary dict - using
keys from the excludes list.
Exercise - Write this as a one liner comprehension.
"""
new_dict = {}
for k, v in data.items():
if k not in excludes:
new_dict[k] = v
return new_dict
def send_email(receiver_email, subject, message_body):
"""
Sends a plain-text email to receiver_email address with
subject and message body
:param receiver_email - Email address of the receiver (To)
:param subject - Subject field of the email
:param message_body - Body of the email.
"""
message = f"""\
Subject: {subject}
{message_body}."""
context = ssl.create_default_context()
with smtplib.SMTP_SSL(SMTP['host'], SMTP['port'], context=context) as server:
server.login(SMTP['sender_email'], SMTP['sender_password'])
server.sendmail(SMTP['sender_email'], receiver_email, message)
# ----------------------------------------------------------------
# THE FOLLOWING TWO FUNCTIONS MIGHT BE USED LATER
# ----------------------------------------------------------------
def csv_create(file, columns, data):
"""
Generate a csv (Comma Separated) file with columns and rows from the sorted data
Writes the csv file to
param: data: The data to write to
"""
with open(file, 'w', encoding='UTF8') as f:
try:
writer = csv.DictWriter(f, fieldnames=columns, delimiter=',')
writer.writeheader()
# Iterate over data adding columns from the data
for item in data:
row = {}
for col in columns:
row[col] = item[col]
writer.writerow(row)
except Exception as e:
# Raise a generic error here - ideally need to be more specific...
print(f"Problem setting writing to csv file --> {str(e)} status_code=500")
raise Exception
print("Written CSV file")
def send_file(url, file):
"""
Send a file to a url using requests and POST
param: url - the url to send to
param: csv - the file to post
"""
if type(url) is str and url is not "" and type(file) is str and file is not "":
print("Sending file")
url = f"{url}/post"
file = {'file': open(file, 'rb')}
try:
response = requests.post(url, files=file)
if response.status_code == 200:
print("File sent")
else:
message = response.text
raise ApiError(message=message, status_code=response.status_code)
except Exception as e:
error = str(e)
message = f"Sending file {file} to url {url}/post failed with an error {error}"
raise ApiError(message=message)
else:
message = f"url {url} and file {file} should be of type str"
raise ApiError(message=message)