-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathappend_floppy
executable file
·214 lines (172 loc) · 6.64 KB
/
append_floppy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/env python3
import boto3
import csv
import io
import sys
from datetime import datetime
# S3 configuration
BUCKET_NAME = 'ataristdb.sidecartridge.com'
PREFIX = 'db/' # Add the folder prefix here
PREFIX_IMAGE = 'MISC/' # Add the folder prefix here
def download_csv_from_s3(bucket_name, file_name_with_prefix):
"""
Download a CSV file from S3 and return its content as a string.
"""
s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket_name, Key=file_name_with_prefix)
return response['Body'].read().decode('utf-8')
def parse_csv(csv_content, date_parser=False):
"""
Parse CSV content and return it as a list of dictionaries for easy processing.
"""
parsed_data = []
csv_reader = csv.reader(io.StringIO(csv_content), delimiter=';')
for row in csv_reader:
if len(row) >= 6: # Ensure there are enough columns
# Validate the "timestamp" field
timestamp_value = row[2]
if date_parser:
try:
# Convert to an integer epoch if valid
timestamp_value = int(timestamp_value)
except ValueError:
# Default to 0 if invalid
timestamp_value = 0
# Append the processed row to the parsed data
parsed_data.append({
"title": row[0],
"value1": row[1],
"timestamp": timestamp_value,
"value3": row[3],
"category": row[4],
"file_path": row[5]
})
return parsed_data
def date_to_epoch(date_str):
"""
Convert a date string (YYYY/MM/DD) to epoch time (seconds since Unix epoch).
"""
try:
dt = datetime.strptime(date_str, "%Y/%m/%d")
return int(dt.timestamp())
except ValueError:
# Handle invalid date
return 0
def write_csv_to_string(data):
"""
Convert a list of dictionaries back to a CSV string.
"""
output = io.StringIO()
csv_writer = csv.writer(output, delimiter=';', quoting=csv.QUOTE_ALL) # Enable quoting for all fields
for entry in data:
csv_writer.writerow([
entry["title"],
entry["value1"],
"" if entry["timestamp"] == 0 else entry["timestamp"],
entry["value3"],
entry["category"],
entry["file_path"]
])
return output.getvalue()
def upload_csv_to_s3(bucket_name, file_name_with_prefix, csv_content):
"""
Upload a CSV string to S3.
"""
s3 = boto3.client('s3')
s3.put_object(Bucket=bucket_name, Key=file_name_with_prefix, Body=csv_content)
def backup_file_csv_in_s3(bucket_name, file_name_with_prefix):
"""
Create a backup of a remote CSV file in S3 by appending ".bak" to its name.
"""
s3 = boto3.client('s3')
# Determine the backup file name
backup_file_name_with_prefix = file_name_with_prefix + ".bak"
try:
# Download the original file
print(f"Downloading original file: {file_name_with_prefix}")
response = s3.get_object(Bucket=bucket_name, Key=file_name_with_prefix)
original_content = response['Body'].read()
# Upload the original file content as the backup file
print(f"Uploading backup file: {backup_file_name_with_prefix}")
s3.put_object(Bucket=bucket_name, Key=backup_file_name_with_prefix, Body=original_content)
print(f"Backup created successfully: {backup_file_name_with_prefix}")
except Exception as e:
print(f"Error creating backup: {e}")
raise
def main():
# Check if arguments are passed
if len(sys.argv) < 4:
print("Usage: python append_floppy.py <title> <file_path> <category>")
sys.exit(1)
# Get title, file_path, and category from arguments
title = sys.argv[1]
file_path = sys.argv[2]
category = sys.argv[3]
# Append the image prefix to the file_path
file_path = PREFIX_IMAGE + file_path
# Generate the file name: first letter of title (lowercase) + .csv
file_name = title[0].lower() + ".csv"
file_name_with_prefix = PREFIX + file_name
print(f"Generated file name with prefix: {file_name_with_prefix}")
# Step 1: Download the CSV file from S3
print(f"Downloading {file_name_with_prefix} from S3...")
try:
csv_content = download_csv_from_s3(BUCKET_NAME, file_name_with_prefix)
except Exception as e:
print(f"Error downloading file: {e}")
sys.exit(1)
print("Creating backup...")
backup_file_csv_in_s3(BUCKET_NAME, file_name_with_prefix)
# Step 2: Parse the CSV content
print("Parsing CSV content...")
data_alphabet = parse_csv(csv_content)
# Step 3: Download the CSV file from S3 with what's new
print("Downloading /db/_.csv from S3...")
try:
csv_content = download_csv_from_s3(BUCKET_NAME, PREFIX + "_.csv")
except Exception as e:
print(f"Error downloading file: {e}")
sys.exit(1)
print("Creating backup...")
backup_file_csv_in_s3(BUCKET_NAME, PREFIX + "_.csv")
# Step 4: Parse the CSV content
print("Parsing CSV content...")
data_whatsnew = parse_csv(csv_content, date_parser=True)
# The new entry with the current timestamp
current_timestamp = int(datetime.now().timestamp())
new_entry = {
"title": title,
"value1": "0",
"timestamp": current_timestamp,
"value3": "",
"category": category,
"file_path": file_path
}
# Step 5: Add the new entry to the lists
data_alphabet.append(new_entry)
data_whatsnew.append(new_entry)
# Step 6: Sort the data_alphabet list by title ignoring case
data_alphabet.sort(key=lambda x: x["title"].lower())
# Step 7: Sort the data_whatsnew list by timestamp (newest to oldest)
data_whatsnew.sort(
key=lambda x: x.get("timestamp", 0),
reverse=True
)
# Step 8: Convert the lists back to CSV format
print("Converting data to CSV format...")
csv_alphabet = write_csv_to_string(data_alphabet)
csv_whatsnew = write_csv_to_string(data_whatsnew)
# Step 9: Upload the updated CSV files back to S3
print("Uploading updated files to S3...")
upload_csv_to_s3(BUCKET_NAME, file_name_with_prefix, csv_alphabet)
upload_csv_to_s3(BUCKET_NAME, PREFIX + "_.csv", csv_whatsnew)
print("Update completed successfully.")
# Append new_entry to the file images.log
with open("images.log", "a") as f:
f.write(f'"{title}";"{current_timestamp}";"{category}";"{file_path}"\n')
# print("Updated data:")
# # Print the updated csv content
# print(csv_alphabet)
# print(csv_whatsnew)
if __name__ == "__main__":
main()