-
Notifications
You must be signed in to change notification settings - Fork 0
/
wrapper.py
218 lines (194 loc) · 10.6 KB
/
wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# LIBRARY IMPORTS
import logging
import os
from pathlib import Path
import csv
import boto3
from botocore.exceptions import ClientError
# LOCAL IMPORTS
import nextlevel_config as nc
import nextlevel_strings as nstrings
import secret
# WRAPPER CLASS DEFINITION
class Wrapper():
def __init__(self):
# Make the local directories for saving (if they don't already exist)
self.saveDirString = "/".join(nc.SAVE_DIRECTORY)
if not os.path.exists(str(Path.cwd() / self.saveDirString)):
if nc.ENABLE_CONSOLE_OUTPUT:
print("No save directory detected.")
for i in range(len(nc.SAVE_DIRECTORY)):
os.makedirs(Path.cwd() / ("/".join(nc.SAVE_DIRECTORY[0:i + 1])))
if nc.ENABLE_CONSOLE_OUTPUT:
print("Save directory created.")
# INSTANTIABLE MEMBER VARIABLES
# s3: boto3 client instance
self.s3 = boto3.client("s3")
# token: The unique authentication token from the "Bot" section of your Discord App
self.token = secret.AUTH_TOKEN_DISCORD
# AMAZON WEB SERVICES S3 BUCKET NAME RETRIEVAL
self.bucketName = None
# AWS bucket name retrieval
# Gets the name of the S3 bucket based on Heroku configuration
self.bucketName = os.environ.get('S3_BUCKET')
# Fixes this if running locally
if not nc.FLAG_HEROKU:
self.bucketName = secret.AWS_S3_BUCKET
# DEFAULT AND PLACEHOLDER VALUES OF MEMBER VARIABLES
# The total number of unique wallet addresses which have been submitted through the application
self.allAddressCount = 0
# The total number of discord users who have submitted an address to the application
self.userCount = 0
# userList is a list of lists. The internal lists have the following indices
# USERLIST COLUMN INDICES
# 0: Discord User ID
# 1: Storage Index
# 2: First Timestamp
# 3: Last Timestamp
# 4: Wallet Address
# 5: First Discord Username
# 6: Current Discord Username
self.userList = []
# userDict is a dictionary mapping string keys (Discord User ID) to integer values (user index integer)
self.userDict = {}
# allAddressDict is a dictionary mapping string keys (ALL-UPPERCASE address) to integer values (user index integer)
self.allAddressDict = {}
# DOWNLOADING CONTENT FROM AWS S3 BUCKET
self.downloadS3()
# Loading USER and ADDRESS content from newly-downloaded local files
self.loadUsers()
self.loadAddresses()
# Wrapper creation success message
if nc.ENABLE_CONSOLE_OUTPUT:
print("Wrapper instance created successfully!")
# downloadS3
# Downloads the USER and ADDRESS files from the designated S3 bucket (regardless of public or private ACL settings)
def downloadS3(self):
# Build path strings
userFilepath = str(Path.cwd() / self.saveDirString / (nc.USER_FILE_NAME + nc.FILE_EXTENSION))
addressFilepath = str(Path.cwd() / self.saveDirString / (nc.ADDRESS_FILE_NAME + nc.FILE_EXTENSION))
# AMAZON WEB SERVICES S3 FILE RETRIEVAL
# Attempt to download the USER file from S3
try:
self.s3.download_file(self.bucketName, (self.saveDirString + "/" + (nc.USER_FILE_NAME + nc.FILE_EXTENSION)), userFilepath)
except ClientError as e:
logging.error(e)
if nc.ENABLE_CONSOLE_OUTPUT:
print("ERROR: Failed to download USER information file from AWS S3 Bucket.\nThe bot will proceed as a fresh instance.")
if e.response['Error']['Code'] == "404":
print("\tCODE 404: The object does not exist.")
else:
if nc.ENABLE_CONSOLE_OUTPUT:
print("Downloaded USER information file from AWS S3 Bucket.")
# Attempt to download the ADDRESS file from S3
try:
self.s3.download_file(self.bucketName, (self.saveDirString + "/" + (nc.ADDRESS_FILE_NAME + nc.FILE_EXTENSION)), addressFilepath)
except ClientError as e:
logging.error(e)
if nc.ENABLE_CONSOLE_OUTPUT:
print("ERROR: Failed to download ADDRESS information file from AWS S3 Bucket.\nThe bot will proceed as a fresh instance.")
if e.response['Error']['Code'] == "404":
print("\tCODE 404: The object does not exist.")
else:
if nc.ENABLE_CONSOLE_OUTPUT:
print("Downloaded ADDRESS information file from AWS S3 Bucket.")
# uploadS3
# Uploads the USER and ADDRESS files to the designated S3 bucket (and gives them public-read permissions)
def uploadS3(self):
# Build path strings
userFilepath = str(Path.cwd() / self.saveDirString / (nc.USER_FILE_NAME + nc.FILE_EXTENSION))
addressFilepath = str(Path.cwd() / self.saveDirString / (nc.ADDRESS_FILE_NAME + nc.FILE_EXTENSION))
# Save copies of the existing files before uploading
self.saveUsers()
self.saveAddresses()
# AMAZON WEB SERVICES S3 BUCKET SAVING (UPLOADING)
# Attempt to upload the USER file to S3
try:
self.s3.upload_file(userFilepath, self.bucketName, (self.saveDirString + "/" + (nc.USER_FILE_NAME + nc.FILE_EXTENSION)), ExtraArgs={'ACL':'public-read'})
except ClientError as e:
logging.error(e)
if nc.ENABLE_CONSOLE_OUTPUT:
print("ERROR: Failed to upload USER information file to AWS S3 Bucket.")
else:
if nc.ENABLE_CONSOLE_OUTPUT:
print("Uploaded USER information file to AWS S3 Bucket.")
# Attempt to upload the ADDRESS file to S3
try:
self.s3.upload_file(addressFilepath, self.bucketName, (self.saveDirString + "/" + (nc.ADDRESS_FILE_NAME + nc.FILE_EXTENSION)), ExtraArgs={'ACL':'public-read'})
except ClientError as e:
logging.error(e)
if nc.ENABLE_CONSOLE_OUTPUT:
print("ERROR: Failed to upload ADDRESS information file to AWS S3 Bucket.")
else:
if nc.ENABLE_CONSOLE_OUTPUT:
print("Uploaded ADDRESS information file to AWS S3 Bucket.")
# saveUsers
# Saves, according to app_config.py, a file of USER information from local storage
def saveUsers(self):
# Open the USER save file for WRITING
with open(str(Path.cwd() / self.saveDirString / (nc.USER_FILE_NAME + nc.FILE_EXTENSION)), "w", newline="") as csvFile:
csvWriter = csv.writer(csvFile, delimiter=",")
# Iterate through each user in the userList
for sublist in self.userList:
csvWriter.writerow([str(elt) for elt in sublist])
# Print information about the successful write operation
if nc.ENABLE_CONSOLE_OUTPUT:
print("Successfully saved USER file with " + str(self.userCount) + " rows.")
# saveAddresses
# Saves, according to app_config.py, a file of ADDRESS information from local storage
def saveAddresses(self):
# Open the ADDRESS save file for WRITING
with open(str(Path.cwd() / self.saveDirString / (nc.ADDRESS_FILE_NAME + nc.FILE_EXTENSION)), "w", newline="") as csvFile:
csvWriter = csv.writer(csvFile, delimiter=",")
# Iterate through each user in the allAddressDict
# KEY = ADDRESS (STRING), VALUE = DISCORD USER ID (INT, but cast to string before saving)
for address in self.allAddressDict:
csvWriter.writerow([address, str(self.allAddressDict[address])])
# Print information about the successful write operation
if nc.ENABLE_CONSOLE_OUTPUT:
print("Successfully saved ADDRESS file with " + str(self.allAddressCount) + " rows.")
# loadUsers
# Loads, according to app_config.py, a file of USER information from local storage
def loadUsers(self):
# CHECK FOR LOCAL SAVE FILE INPUT (now that the content has been downloaded)
if os.path.exists(str(Path.cwd() / self.saveDirString / (nc.USER_FILE_NAME + nc.FILE_EXTENSION))):
# Open the USER save file
with open(str(Path.cwd() / self.saveDirString / (nc.USER_FILE_NAME + nc.FILE_EXTENSION)), newline="") as csvFile:
csvReader = csv.reader(csvFile, delimiter=",")
# Iterates through each row in the input file
for row in csvReader:
# Appends a new list into the userList, casting string into integers where appropriate
self.userList.append([int(row[0]), int(row[1]), row[2], row[3], row[4], row[5]])
# Adds the corresponding element into userDict
self.userDict[int(row[0])] = int(row[1])
# Set the user count
self.userCount = len(self.userDict)
# Print information about the successful read operation
if nc.ENABLE_CONSOLE_OUTPUT:
print("Successfully read from USER file with " + str(self.userCount) + " rows.")
# If the file does not yet exist, print a warning to the console
else:
if nc.ENABLE_CONSOLE_OUTPUT:
print("WARNING: There is no USER save file present in the local working directory!")
# loadAddresses
# Loads, according to app_config.py, a file of ADDRESS information from local storage
def loadAddresses(self):
# CHECK FOR LOCAL SAVE FILE INPUT (now that the content has been downloaded)
if os.path.exists(str(Path.cwd() / self.saveDirString / (nc.ADDRESS_FILE_NAME + nc.FILE_EXTENSION))):
# Open the ADDRESS save file
with open(str(Path.cwd() / self.saveDirString / (nc.ADDRESS_FILE_NAME + nc.FILE_EXTENSION)), newline="") as csvFile:
csvReader = csv.reader(csvFile, delimiter=",")
# Iterates through each row in the input file
for row in csvReader:
# Adds the corresponding element into allAddressDict
# KEY = ADDRESS (STRING), VALUE = DISCORD USER ID (INT)
self.allAddressDict[row[0]] = int(row[1])
# Set the address count
self.allAddressCount = len(self.allAddressDict)
# Print information about the successful read operation
if nc.ENABLE_CONSOLE_OUTPUT:
print("Successfully read from ADDRESS file with " + str(self.allAddressCount) + " rows.")
# If the file does not yet exist, print a warning to the console
else:
if nc.ENABLE_CONSOLE_OUTPUT:
print("WARNING: There is no ADDRESS save file present in the local working directory!")