-
Notifications
You must be signed in to change notification settings - Fork 0
/
user_log_sql.py
127 lines (94 loc) · 3.44 KB
/
user_log_sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Required packages:
# pip install fastapi
# pip install pydantic
# pip install uvicorn
# pip install sqlalchemy
from fastapi import FastAPI, Form, UploadFile, File, HTTPException
from fastapi.responses import FileResponse
from hashlib import sha256
from sqlalchemy import create_engine, Column, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
import time
# Database setup
DATABASE_URL = "sqlite:////Users/yifeicao/PycharmProjects/icucatl/user_log.db"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Models
class UserDB(Base):
__tablename__ = "users"
username = Column(String, primary_key=True, index=True)
password_hash = Column(String, index=True)
log = Column(Text, default="")
Base.metadata.create_all(bind=engine)
# App setup
app = FastAPI()
# Local file storage path
UPLOAD_DIR = "uploaded_files/"
os.makedirs(UPLOAD_DIR, exist_ok=True)
def hash_password(password: str) -> str:
return sha256(password.encode()).hexdigest()
def get_user(db, username: str):
return db.query(UserDB).filter(UserDB.username == username).first()
def save_user(db, user: UserDB):
db.add(user)
db.commit()
db.refresh(user)
def delete_old_files():
now = time.time()
cutoff = now - (30 * 86400) # 30 days ago timestamp
for filename in os.listdir(UPLOAD_DIR):
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.getmtime(file_path) < cutoff:
os.remove(file_path)
@app.post("/register/")
async def register(username: str = Form(), password: str = Form(), log: str = Form(default="")):
db = SessionLocal()
try:
if get_user(db, username):
raise HTTPException(status_code=400, detail="Username already exists")
password_hash = hash_password(password)
user = UserDB(username=username, password_hash=password_hash, log=log)
save_user(db, user)
return {"message": "User registered successfully"}
finally:
db.close()
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
db = SessionLocal()
try:
user = get_user(db, username)
if user and user.password_hash == hash_password(password):
return {"username": username, "log": user.log}
else:
raise HTTPException(status_code=401, detail="Incorrect username or password")
finally:
db.close()
@app.post("/upload/")
async def upload_file(username: str = Form(), file: UploadFile = File(...)):
db = SessionLocal()
try:
user = get_user(db, username)
if not user:
raise HTTPException(status_code=404, detail="User not found")
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
buffer.write(file.file.read())
return {"filename": file.filename, "file_url": f"/files/{file.filename}"}
finally:
db.close()
@app.get("/files/{filename}")
async def get_file(filename: str):
file_path = os.path.join(UPLOAD_DIR, filename)
if os.path.exists(file_path):
return FileResponse(file_path)
else:
raise HTTPException(status_code=404, detail="File not found")
@app.on_event("startup")
async def startup_event():
delete_old_files()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)