From 4407704e61cbb1562bad7f0e2bb4bd0f3739aa77 Mon Sep 17 00:00:00 2001 From: HemanthSai7 Date: Tue, 7 Nov 2023 23:49:36 +0530 Subject: [PATCH] Generate documentation using Techdocs --- TechdocsAPI/backend/core/ConfigEnv.py | 42 +- TechdocsAPI/backend/core/Exceptions.py | 149 ++- TechdocsAPI/backend/models/generic.py | 19 +- TechdocsAPI/backend/services/auth/ops.py | 183 ++- .../backend/services/auth/utils/auth_funcs.py | 164 ++- .../backend/services/db/utils/DBQueries.py | 80 +- frontend/components/authors.py | 15 +- frontend/components/login.py | 90 +- frontend/components/logo.py | 52 +- frontend/components/logout.py | 18 +- frontend/components/user_greetings.py | 25 +- frontend/layouts/mainlayout.py | 43 +- .../pages/2_\360\237\223\235_Instructions.py" | 34 +- "frontend/pages/3_\360\237\222\273_Demo.py" | 87 +- "frontend/\360\237\217\241_Home.py" | 93 +- logs_2023_11_07_22_45_07.log | 1185 +++++++++++++++++ techdocs/techdocs/cli.py | 18 +- techdocs/techdocs/ops.py | 229 +++- techdocs/techdocs/utils/LoggingManager.py | 108 +- techdocs/techdocs/utils/functools.py | 137 +- techdocs/techdocs/utils/parse.py | 47 +- testing/pack/DBQueries.py | 48 + testing/pack/functools.py | 48 + 23 files changed, 2405 insertions(+), 509 deletions(-) create mode 100644 logs_2023_11_07_22_45_07.log diff --git a/TechdocsAPI/backend/core/ConfigEnv.py b/TechdocsAPI/backend/core/ConfigEnv.py index bc8fb89..4f80efc 100644 --- a/TechdocsAPI/backend/core/ConfigEnv.py +++ b/TechdocsAPI/backend/core/ConfigEnv.py @@ -1,37 +1,39 @@ """Config class for handling env variables. """ from functools import lru_cache - from pydantic import BaseSettings - class Settings(BaseSettings): HOSTNAME: str DATABASE: str UID: str PASSWORD: str - ALGORITHM:str - JWT_SECRET_KEY:str - JWT_REFRESH_SECRET_KEY:str - JWT_VERIFICATION_SECRET_KEY:str - # OPENAI_KEY:str - APP_ID:str - USER_ID:str - MODEL_ID:str - CLARIFAI_PAT:str - MODEL_VERSION_ID:str + ALGORITHM: str + JWT_SECRET_KEY: str + JWT_REFRESH_SECRET_KEY: str + JWT_VERIFICATION_SECRET_KEY: str + APP_ID: str + USER_ID: str + MODEL_ID: str + CLARIFAI_PAT: str + MODEL_VERSION_ID: str + MAIL_USERNAME: str + MAIL_PASSWORD: str + MAIL_FROM: str - MAIL_USERNAME:str - MAIL_PASSWORD:str - MAIL_FROM:str - class Config: - env_file = ".env" - + env_file = '.env' @lru_cache() def get_settings(): - return Settings() + """ + Returns the settings object. + This function returns a settings object of the type Settings. This object contains various configuration + settings used throughout the application. -config = get_settings() + Returns: + Settings: The settings object. + """ + return Settings() +config = get_settings() \ No newline at end of file diff --git a/TechdocsAPI/backend/core/Exceptions.py b/TechdocsAPI/backend/core/Exceptions.py index f314604..2c853f0 100644 --- a/TechdocsAPI/backend/core/Exceptions.py +++ b/TechdocsAPI/backend/core/Exceptions.py @@ -1,57 +1,188 @@ import json - from backend.models import GeneralResponse, TokenSchema - class InvalidCredentialsException(Exception): + def __init__(self, token_result: GeneralResponse): + """ + Initializes an instance of InvalidCredentialsException. + + Arguments: + token_result -- GeneralResponse: The result of a token request. + + Raises: + InvalidCredentialsException: If the token request was unsuccessful. + + """ self.token_result = token_result self.set_statuses() super(InvalidCredentialsException, self).__init__() def set_statuses(self): + """ + This method updates the status of the token_result attribute to 'login_failed'. + + Arguments: + self -- The object instance. + + Raises: + No exceptions are raised by this method. + + Returns: + None, this method doesn't return a value. + """ self.token_result.status = 'login_failed' def __repr__(self): - return "exception.InvalidCredentialsException()" + """ + Returns a string representation of the object. + + This method is used to return a string that represents the object. This string can be used to reconstruct the object. + + Returns: + str: A string representation of the object. + """ + return 'exception.InvalidCredentialsException()' class ExistingUserException(Exception): + def __init__(self, response_result: GeneralResponse): + """ + Initializes an instance of ExistingUserException. + + Args: + response_result: GeneralResponse + The response result object that contains information about the response. + + Raises: + None + + Returns: + None + """ self.response_result = response_result self.set_statuses() super(ExistingUserException, self).__init__() def set_statuses(self): + """ + This function sets the status and message of the response_result object. + + Args: + self: The object of the class. + + Returns: + None + + Raises: + None + """ self.response_result.status = f'failed' - self.response_result.message.append(f'user with this AADHAR Number already has an account') + self.response_result.message.append(f'user already has an account') self.response_result.message[0] = 'authenticated' def __repr__(self): - return "exception.ExistingUserException()" - + """ + This method returns a representation of the object. + + Returns: + exception.ExistingUserException: This exception is raised when a user with the same username already exists. + + Raises: + No exceptions are raised by this method. + + """ + return 'exception.ExistingUserException()' + class InfoNotFoundException(Exception): + def __init__(self, response_result: GeneralResponse, message: str): + """ + Initializes an instance of InfoNotFoundException. + + This is a custom exception class used when information is not found. + + Args: + response_result: GeneralResponse + The response result object. + message: str + A human-readable message describing the error. + + Raises: + Exception + If the initialization fails. + + """ self.response_result = response_result self.message = message self.set_statuses() super(InfoNotFoundException, self).__init__(message) def set_statuses(self): + """ + This method updates the status and message in the response_result dictionary. + + Args: + self: The object instance. + + Attributes: + response_result: A dictionary containing the response result. + + Methods: + self.response_result['status'] = 'abort': Updates the status in the response_result dictionary to 'abort'. + self.response_result['message'][0] = 'authenticated': Updates the first message in the message list in the response_result dictionary to 'authenticated'. + self.response_result['message'].append(self.message): Appends the value of self.message to the message list in the response_result dictionary. + + Returns: + None: This method does not return any value. + + Raises: + None: This method does not raise any exceptions. + """ self.response_result['status'] = 'abort' self.response_result['message'][0] = 'authenticated' self.response_result['message'].append(self.message) def __repr__(self): - return "exception.InfoNotFoundException()" - + """ + Returns a string representation of the object. + + Returns: + str: A string representation of the object. + """ + return 'exception.InfoNotFoundException()' class EmailNotVerifiedException(Exception): + def __init__(self): + """ + Initializes an instance of EmailNotVerifiedException. + + This method sets the statuses and initializes the exception with a message. + + Parameters: + self: The instance of the class. + + Returns: + None: This method doesn't return anything. + + Raises: + None: This method doesn't raise any exceptions. + """ self.set_statuses() super(EmailNotVerifiedException, self).__init__() def set_statuses(self): + """ +def set_statuses(self): + """ self.status = 'EmailNotVerifiedException' def __repr__(self): - return "exception.EmailNotVerifiedException()" + """ + This method returns a representation of the object. + + Returns: + str: A string representation of the object. + """ + return 'exception.EmailNotVerifiedException()' \ No newline at end of file diff --git a/TechdocsAPI/backend/models/generic.py b/TechdocsAPI/backend/models/generic.py index d2a37e6..c8bc1e1 100644 --- a/TechdocsAPI/backend/models/generic.py +++ b/TechdocsAPI/backend/models/generic.py @@ -1,14 +1,25 @@ from pydantic import BaseModel from typing import List - class Base(BaseModel): + @classmethod def get_instance(cls, **kwargs): - return cls(**kwargs) + """ + This is a class method that creates an instance of the class. + Args: + **kwargs: Keyword arguments to be passed to the class constructor. + + Returns: + An instance of the class. + + Raises: + No exceptions are raised by this method. + """ + return cls(**kwargs) class GeneralResponse(Base): - status:str + status: str message: List[str] - data:dict \ No newline at end of file + data: dict \ No newline at end of file diff --git a/TechdocsAPI/backend/services/auth/ops.py b/TechdocsAPI/backend/services/auth/ops.py index add08f6..039de81 100644 --- a/TechdocsAPI/backend/services/auth/ops.py +++ b/TechdocsAPI/backend/services/auth/ops.py @@ -6,16 +6,11 @@ from backend.core.ExceptionHandlers import * from backend.core.ConfigEnv import config from backend import app - from fastapi import HTTPException, BackgroundTasks from pydantic import ValidationError from jose import jwt - from fastapi_mail import MessageSchema, MessageType -# import openai -# from transformers import RobertaTokenizer, T5ForConditionalGeneration - async def ops_signup(bgtasks: BackgroundTasks, response_result: GeneralResponse, data: UserAuth): """Wrapper method to handle signup process. @@ -28,59 +23,60 @@ async def ops_signup(bgtasks: BackgroundTasks, response_result: GeneralResponse, Raises: ExistingUserException: If account with entered AADHAR Number already exists. """ - # querying database to check if user already exist + + # querying database to check if user already exist user = DBQueries.fetch_data_from_database('auth', ['username', 'email'], f"username='{data.username}' OR email='{data.email}'") if len(list(user)) != 0: # user with the entered credentials already exists raise ExistingUserException(response_result) - verifiction_token = Auth.create_access_token(f"{data.username} {data.email}", secret_name='VERIFICATION') - verification_link = f"http://localhost:8000/auth/verify/{verifiction_token}" + + verifiction_token = Auth.create_access_token(f'{data.username} {data.email}', secret_name='VERIFICATION') + verification_link = f'https://caffeinecrew-techdocs.hf.space/auth/verify/{verifiction_token}' email_body_params = { - "username": data.username, - "verify_link": verification_link + 'username': data.username, + 'verify_link': verification_link } message = MessageSchema( - subject="Welcome to Techdocs:[Account Verification]", - recipients=[data.email], # List of recipients, as many as you can pass - template_body=email_body_params, + subject='Welcome to Techdocs:[Account Verification]', + recipients=[data.email], + template_body=email_body_params, subtype=MessageType.html ) + bgtasks.add_task(app.state.mail_client.send_message, message=message, template_name='email_verification.html') + DBQueries.insert_to_database('auth', (data.username, Auth.get_password_hash(data.password), '', 0), ['username', 'password', 'email', 'is_verified']) - bgtasks.add_task(app.state.mail_client.send_message, message=message, template_name="email_verification.html") - # await app.state.mail_client.send_message(message=message, template_name="email_verification.html") - - DBQueries.insert_to_database('auth', (data.username, Auth.get_password_hash(data.password), "", 0), - ['username', 'password', 'email', 'is_verified']) - - - response_result.status = 'success' response_result.message = [f'Activate your account by clicking on the link sent to {data.email}.\nMake sure to check your spam folder.'] -def ops_login(data:LoginCreds): - """Wrapper method to handle login process. +def ops_login(data: LoginCreds): + """ + Authenticates the user based on provided login credentials. Args: - data: LoginCreds. User's credentials from the frontend to login to their account. + data: A LoginCreds object containing the username and password for login. Returns: - TokenSchema. A Pydantic BaseModel to return the JWT tokens to the frontend. + A TokenSchema object containing the access_token and refresh_token for the authenticated user. Raises: - InvalidCredentialsException: If account with entered credentials does not exist. + InvalidCredentialsException: If the provided username or password is incorrect. + EmailNotVerifiedException: If the user's email is not verified. """ + # querying database to check if user already exist - response_result = GeneralResponse.get_instance(data={}, - status="not_allowed", - message=["Not authenticated"] - ) + response_result = GeneralResponse.get_instance(data={}, + status='not_allowed', + message=['Not authenticated'] + ) user = DBQueries.fetch_data_from_database('auth', ['username', 'password', 'is_verified'], f"username='{data.username}'") user = list(user) + if len(user) == 0: # user with the entered credentials does not exist raise InvalidCredentialsException(response_result) + user = user[0] if not Auth.verify_password(data.password, user[1]) and Auth.verify_username(data.username, user[0]): # password is incorrect @@ -88,81 +84,124 @@ def ops_login(data:LoginCreds): if not user[2]: raise EmailNotVerifiedException() - # password is correct - return TokenSchema(access_token=Auth.create_access_token(data.username), - refresh_token=Auth.create_access_token(data.username, secret_name='REFRESH'), - ) + return TokenSchema(access_token=Auth.create_access_token(data.username), refresh_token=Auth.create_access_token(data.username, secret_name='REFRESH')) + +def ops_regenerate_api_key(username: str) -> APIKey: + """ + This function is used to regenerate the API key for a given user. -def ops_regenerate_api_key(username:str) -> APIKey: + Args: + username: A string representing the username of the user for whom the API key needs to be regenerated. - user_API_entry = DBQueries.fetch_data_from_database('api_key', 'apikey', f"username='{username}'") + Returns: + An instance of the APIKey class, representing the newly generated API key. + + Raises: + No exceptions are raised by this function. + """ + user_API_entry = DBQueries.fetch_data_from_database( + 'api_key', + 'apikey', + f"username='{username}'") + user_API_entry = list(user_API_entry) apikey = None - if len(user_API_entry) != 0: apikey = APIKey(api_key=Auth.generate_api_key(username)) - DBQueries.update_data_in_database('api_key','apikey',f"username='{username}'", apikey.api_key) - + DBQueries.update_data_in_database('api_key', 'apikey', f"username='{username}'", apikey.api_key) else: apikey = Auth.generate_api_key(username) DBQueries.insert_to_database('api_key', (username, apikey), ['username', 'apikey']) apikey = APIKey(api_key=apikey) - return apikey - - -def ops_inference(source_code:str,api_key:str,username:str): - response_result = GeneralResponse.get_instance(data={}, - status="not_allowed", - message=["Not authenticated"] - ) +def ops_inference(source_code: str, api_key: str, username: str): + """ + This function infers the operation from the source code provided. + + Arguments: + source_code (str): The source code to be analyzed. + api_key (str): The API key for authentication. + username (str): The username for authentication. + + Raises: + InfoNotFoundException: If the user is not found. + InvalidCredentialsException: If the provided API key does not match the stored API key for the user. - user=DBQueries.fetch_data_from_database('api_key', ['apikey'], f"username='{username}'") + Returns: + str: The inferred operation from the source code. + """ + response_result = GeneralResponse.get_instance( + data={}, + status='not_allowed', + message=['Not authenticated']) + + user = DBQueries.fetch_data_from_database('api_key', ['apikey'], f"username='{username}'") if len(list(user)) == 0: - # user with the entered credentials does not exist - raise InfoNotFoundException(response_result,"User not found") - elif list(user)[0][0]!=api_key: + raise InfoNotFoundException(response_result, 'User not found') + elif list(user)[0][0] != api_key: raise InvalidCredentialsException(response_result) - + def generate_docstring(source_code_message: str): + """ + This function generates a docstring for a given Python function based on its source code. + Arguments: + source_code_message -- A string containing the source code of a Python function. - llm_response = app.state.llmchain.run({"instruction": source_code_message}) + Returns: + A detailed docstring for the given Python function. - docstring = Inference(docstr=llm_response) - + Raises: + Exception -- If the source code message is not a string, or if it's not a valid Python function. + """ + llm_response = app.state.llmchain.run({'instruction': source_code_message}) + docstring = Inference(docstr=llm_response) return docstring - return generate_docstring(source_code) +def ops_verify_email(request: Request, response_result: GeneralResponse, token: str): + """ + This function verifies the email address provided in the token. + + Args: + request: The current request object. (type: Request) + response_result: The response object to hold the result of the operation. (type: GeneralResponse) + token: The token containing the email address to verify. (type: str) + + Returns: + A TemplateResponse object containing the appropriate HTML template based on the verification result. (type: app.state.templates.TemplateResponse) + + Raises: + jwt.JWTError: If there's an error while decoding the JWT token. + ValidationError: If there's an error while validating the token. + InfoNotFoundException: If the user associated with the token is not found in the database. -def ops_verify_email(request: Request, response_result: GeneralResponse, token:str): + """ try: - payload = jwt.decode( - token, config.JWT_VERIFICATION_SECRET_KEY, algorithms=[config.ALGORITHM] - ) + payload = jwt.decode(token, config.JWT_VERIFICATION_SECRET_KEY, algorithms=[config.ALGORITHM]) token_data = TokenPayload(**payload) - if datetime.fromtimestamp(token_data.exp)< datetime.now(): - return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request}) + if datetime.fromtimestamp(token_data.exp) < datetime.now(): + return app.state.templates.TemplateResponse('verification_failure.html', context={'request': request}) + username, email = token_data.sub.split(' ', maxsplit=1) registered_email = DBQueries.fetch_data_from_database('auth', ['is_verified'], f"username='{username}'") + registered_email = list(registered_email) if len(registered_email) == 0: - raise InfoNotFoundException(response_result,"User not found") - print(registered_email[0][0]) + raise InfoNotFoundException(response_result, 'User not found') + + # print(registered_email[0][0]) if registered_email[0][0]: - return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request}) + return app.state.templates.TemplateResponse('verification_failure.html', context={'request': request}) + + DBQueries.update_data_in_database('auth', 'is_verified', f"username='{username}'", (True,)) + DBQueries.update_data_in_database('auth', 'email', f"username='{username}'", email) - - DBQueries.update_data_in_database('auth','is_verified',f"username='{username}'", (True,)) - DBQueries.update_data_in_database('auth','email',f"username='{username}'", email) response_result.status = 'success' response_result.message = [f'Email verified successfully'] - return app.state.templates.TemplateResponse("verification_success.html", context={"request": request}) - + return app.state.templates.TemplateResponse('verification_success.html', context={'request': request}) except (jwt.JWTError, ValidationError): - return app.state.templates.TemplateResponse("verification_failure.html", context={"request": request}) - \ No newline at end of file + return app.state.templates.TemplateResponse('verification_failure.html', context={'request': request}) \ No newline at end of file diff --git a/TechdocsAPI/backend/services/auth/utils/auth_funcs.py b/TechdocsAPI/backend/services/auth/utils/auth_funcs.py index 463a0b4..95eb601 100644 --- a/TechdocsAPI/backend/services/auth/utils/auth_funcs.py +++ b/TechdocsAPI/backend/services/auth/utils/auth_funcs.py @@ -4,24 +4,14 @@ from datetime import datetime, timedelta from typing import Union, Any import secrets - from jose import jwt from passlib.context import CryptContext from pydantic import ValidationError - from fastapi.exceptions import HTTPException - from backend.core.ConfigEnv import config from backend.core.Exceptions import * from backend.models import TokenPayload, TokenSchema - - -token_expiry_info = { -'ACCESS_TOKEN_EXPIRE_MINUTES': 30, # 30 minutes -'REFRESH_TOKEN_EXPIRE_MINUTES': 60 * 24 * 3, # 3 days -'VERIFICATION_TOKEN_EXPIRE_MINUTES': 20, # 20 minutes -} - +token_expiry_info = {'ACCESS_TOKEN_EXPIRE_MINUTES': 30, 'REFRESH_TOKEN_EXPIRE_MINUTES': 60 * 24 * 3, 'VERIFICATION_TOKEN_EXPIRE_MINUTES': 20} class Auth: """Utility class to perform - 1.encryption via `bcrypt` scheme. @@ -32,72 +22,78 @@ class Auth: pwd_context: CryptContext. Helper for hashing & verifying passwords using `bcrypt` algorithm. """ - pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") - - + pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto') @classmethod - def get_password_hash(cls,password: str) -> str: - """Encrypts the entered password. + def get_password_hash(cls, password: str) -> str: + """ + This method is a class method that generates a hashed version of the provided password. - Args: - password: str. Entered password. + Args: + password: The password to be hashed. It should be a string. - Returns: - returns hashed(encrypted) password string. - """ + Returns: + A hashed version of the password. The return type is a string. + + Raises: + No exceptions are raised by this method. + """ return cls.pwd_context.hash(password) - @classmethod + @classmethod def verify_password(cls, plain_password: str, hashed_password: str) -> bool: - """Validates if the entered password matches the actual password. + """ + This method verifies if the provided plain text password matches with the hashed password. - Args: - plain_password: str. Entered password by user. - hashed_password: str. hashed password from the database. + Args: + plain_password (str): The plain text password entered by the user. + hashed_password (str): The hashed password stored in the database. - Returns: - bool value indicating whether the passwords match or not. - """ + Returns: + bool: Returns True if the plain_password matches with the hashed_password, otherwise False. + + Raises: + None + """ return cls.pwd_context.verify(plain_password, hashed_password) @staticmethod def verify_username(entered_username: str, db_username: str) -> bool: - """Validates if the entered username matches the actual username. + """ + This function is used to verify if the entered username matches the username stored in the database. - Args: - entered_username: str. Entered `username` by user. - db_username: str. username from the database. + Arguments: + entered_username {str} -- The username entered by the user. + db_username {str} -- The username stored in the database. - Returns: - bool value indicating whether the village names match or not. - """ + Returns: + bool -- Returns True if the entered username matches the username stored in the database, False otherwise. + """ return entered_username == db_username @staticmethod - def create_access_token(subject: Union[str, Any], expires_delta: int = None, secret_name: str = None) -> str: - """Creates JWT access token. - - Args: - subject: Union[Any, str]. Hash_key to generate access token from. - expires_delta: int = None. Expiry time for the JWT. - - Returns: - encoded_jwt: str. Encoded JWT token from the subject of interest. + def create_access_token(subject: Union[str, Any], expires_delta: int=None, secret_name: str=None) -> str: """ +@staticmethod +def create_access_token(subject: Union[str, Any], expires_delta: int=None, secret_name: str=None) -> str: + This function creates an access token using the JWT (JSON Web Token) standard. + + :param subject: The subject of the token. This can be any object that identifies the token's subject. + :param expires_delta: (Optional) The time delta in which the token will expire. If not provided, the token will expire after 'ACCESS_TOKEN_EXPIRE_MINUTES' minutes. + :param secret_name: (Optional) The name of the secret key to use for signing the token. If not provided, 'JWT_SECRET_KEY' will be used. + :raises: If any error occurs while creating the token. + :returns: The created access token in the form of a string. +""" secret_key = config.JWT_SECRET_KEY token_expiration = token_expiry_info['ACCESS_TOKEN_EXPIRE_MINUTES'] - if secret_name is not None: - secret_key = config.dict()["JWT_" + secret_name.upper() + "_SECRET_KEY"] - token_expiration = token_expiry_info[secret_name.upper() + "_TOKEN_EXPIRE_MINUTES"] - + secret_key = config.dict()['JWT_' + secret_name.upper() + '_SECRET_KEY'] + token_expiration = token_expiry_info[secret_name.upper() + '_TOKEN_EXPIRE_MINUTES'] if expires_delta is not None: expires_delta = datetime.utcnow() + expires_delta else: expires_delta = datetime.utcnow() + timedelta(minutes=token_expiration) - - to_encode = {"exp": expires_delta, "sub": str(subject)} + to_encode = {'exp': expires_delta, 'sub': str(subject)} encoded_jwt = jwt.encode(to_encode, secret_key, config.ALGORITHM) return encoded_jwt @@ -116,17 +112,12 @@ def generate_access_tokens_from_refresh_tokens(token: str) -> TokenSchema: LoginFailedException: If the current refresh access token is invalid. """ - tokens = TokenSchema.get_instance( - access_token= "", - refresh_token= "", - ) + tokens = TokenSchema.get_instance(access_token='', refresh_token='') try: - payload = jwt.decode( - token, config.JWT_REFRESH_SECRET_KEY, algorithms=[config.ALGORITHM] - ) + payload = jwt.decode(token, config.JWT_REFRESH_SECRET_KEY, algorithms=[config.ALGORITHM]) token_data = TokenPayload(**payload) - if datetime.fromtimestamp(token_data.exp)< datetime.now(): - raise HTTPException(status_code=403, detail="Invalid token or expired token.") + if datetime.fromtimestamp(token_data.exp) < datetime.now(): + raise HTTPException(status_code=403, detail='Invalid token or expired token.') except (jwt.JWTError, ValidationError): raise InvalidCredentialsException(tokens) tokens['access_token'] = Auth.create_access_token(token_data.sub) @@ -135,24 +126,53 @@ def generate_access_tokens_from_refresh_tokens(token: str) -> TokenSchema: @classmethod def generate_api_key(cls, username: str): + """ + This method generates an API key for the given username. + + Args: + username: The username for which the API key is to be generated. + + Returns: + A string representing the generated API key. + + Raises: + TypeError: If the username is not a string. + """ return cls.get_password_hash(username + secrets.token_urlsafe(25 - len(username))) - + @classmethod - def get_user_credentials(cls,access_token:str): - response_result = GeneralResponse.get_instance(data={}, - status="not_allowed", - message=["Not authenticated"] - ) + def get_user_credentials(cls, access_token: str): + """ + This method is used to get the user credentials based on the provided access token. + + Args: + access_token: A string representing the access token. + + Returns: + The user credentials in the form of a string. + + Raises: + InvalidCredentialsException: If the access token is invalid or the user is not authenticated. + + """ + response_result = GeneralResponse.get_instance(data={}, status='not_allowed', message=['Not authenticated']) try: - payload = jwt.decode( - access_token, config.JWT_SECRET_KEY, algorithms=[config.ALGORITHM] - ) + payload = jwt.decode(access_token, config.JWT_SECRET_KEY, algorithms=[config.ALGORITHM]) token_data = TokenPayload(**payload) return token_data.sub except (jwt.JWTError, ValidationError): raise InvalidCredentialsException(response_result) - + @classmethod - def verify_apikey(cls,user_api_key:str,true_api_key:str): - return user_api_key == true_api_key + def verify_apikey(cls, user_api_key: str, true_api_key: str): + """ + This method is a class method that verifies the provided API key against the true API key. + + Args: + user_api_key (str): The API key provided by the user. + true_api_key (str): The true API key to compare with the user's API key. + Returns: + bool: Returns True if the user's API key matches the true API key, and False otherwise. + """ + return user_api_key == true_api_key \ No newline at end of file diff --git a/TechdocsAPI/backend/services/db/utils/DBQueries.py b/TechdocsAPI/backend/services/db/utils/DBQueries.py index 6100f0e..2a4efef 100644 --- a/TechdocsAPI/backend/services/db/utils/DBQueries.py +++ b/TechdocsAPI/backend/services/db/utils/DBQueries.py @@ -1,12 +1,25 @@ from typing import Union, Tuple, List - from backend.utils import DBConnection from backend.core.Exceptions import * - class DBQueries: + @classmethod - def insert_to_database(cls, table_name:str, data:Union[Tuple, List[Tuple]], cols:List[str]=None): + def insert_to_database(cls, table_name: str, data: Union[Tuple, List[Tuple]], cols: List[str]=None): + """ + This method is used to insert data into a specified table in the database. + + Args: + table_name (str): The name of the table where the data will be inserted. + data (Union[Tuple, List[Tuple]]): The data to be inserted into the table. It can be either a tuple or a list of tuples. + cols (List[str], optional): A list of column names in the table. If not provided, the method will assume that all columns are included. Defaults to None. + + Raises: + Exception: If there is an error while connecting to the database or executing the query. + + Returns: + None: This method does not return anything. + """ con = DBConnection.get_client() cursor = con.cursor() QUERY = ('INSERT INTO {table_name} ' @@ -14,47 +27,68 @@ def insert_to_database(cls, table_name:str, data:Union[Tuple, List[Tuple]], cols 'VALUES ' ).format(table_name=table_name) if isinstance(data, list): - QUERY+="("+",".join(["%s" for _ in range(len(data[0]))])+")" + QUERY += '(' + ','.join(['%s' for _ in range(len(data[0]))]) + ')' cursor.executemany(QUERY, data) else: - QUERY+="("+",".join(["%s" for _ in range(len(data))])+")" + QUERY += '(' + ','.join(['%s' for _ in range(len(data))]) + ')' cursor.execute(QUERY, data) con.commit() - + @classmethod - def fetch_data_from_database(cls,table_name:str,cols_to_fetch:Union[str, List[str]], where_clause:str=None): + def fetch_data_from_database(cls, table_name: str, cols_to_fetch: Union[str, List[str]], where_clause: str=None): + """ + This method fetches data from a specified table in the database based on the specified column(s) and optional WHERE clause. + + Args: + table_name (str): The name of the table from which to fetch data. + cols_to_fetch (Union[str, List[str]]): The column(s) to fetch from the table. Can be a single string or a list of strings. + where_clause (str, optional): An optional WHERE clause to filter the fetched data. Defaults to None. + + Returns: + List[tuple]: A list of tuples representing the fetched data, where each tuple corresponds to a row and contains the values of the fetched columns in the order specified. + + Raises: + None + """ con = DBConnection.get_client() cursor = con.cursor() if isinstance(cols_to_fetch, str): cols_to_fetch = [cols_to_fetch] - cols_to_fetch = ", ".join(cols_to_fetch) + cols_to_fetch = ', '.join(cols_to_fetch) QUERY = ('SELECT {cols} FROM {table_name}').format(cols=cols_to_fetch, table_name=table_name) if where_clause: - QUERY = QUERY + " WHERE " + where_clause + QUERY = QUERY + ' WHERE ' + where_clause cursor.execute(QUERY) return cursor.fetchall() - + @classmethod - def update_data_in_database(cls, table_name:str, cols_to_update:Union[str, List[str]], where_clause:str=None, new_values:Union[str, List[str]]=None): + def update_data_in_database(cls, table_name: str, cols_to_update: Union[str, List[str]], where_clause: str=None, new_values: Union[str, List[str]]=None): + """ + This function is used to update the data in a specific table in the database. + + Args: + table_name (str): The name of the table where the data needs to be updated. + cols_to_update (Union[str, List[str]]): The column(s) to be updated. If a string, it should end with '=%s'. If a list, it should contain the column names separated by '=%s, '.join(cols_to_update). + where_clause (str, optional): The WHERE clause to specify the condition for updating the data. Defaults to None. + new_values (Union[str, List[str]], optional): The new values to be updated in the columns. If a string, it should be a list [new_value]. If a list, it should contain the new values corresponding to the columns. Defaults to None. + + Returns: + bool: Returns True if the data is successfully updated in the database. + + Raises: + None + """ con = DBConnection.get_client() cursor = con.cursor() if isinstance(cols_to_update, str): - cols_to_update = cols_to_update + "=%s" + cols_to_update = cols_to_update + '=%s' else: - cols_to_update = "=%s, ".join(cols_to_update) - + cols_to_update = '=%s, '.join(cols_to_update) if isinstance(new_values, str): new_values = [new_values] - QUERY = ('UPDATE {table_name} SET {cols}').format(table_name=table_name, cols=cols_to_update) if where_clause: - QUERY = QUERY + " WHERE " + where_clause + QUERY = QUERY + ' WHERE ' + where_clause cursor.execute(QUERY, new_values) con.commit() - return True - - - - - - + return True \ No newline at end of file diff --git a/frontend/components/authors.py b/frontend/components/authors.py index 882015f..5f8490f 100644 --- a/frontend/components/authors.py +++ b/frontend/components/authors.py @@ -2,9 +2,18 @@ import streamlit as st def authors(): - # {"Github β†’ " "["username"]" "("url")" for username,url in author_details["socials"].items() } - # with open("frontend/contend/authors.json","r") as f: - # author_details = json.load(f) + """ + This function displays the Github profiles of the authors. + + Args: + None + + Returns: + None + + Raises: + None + """ st.sidebar.divider() st.sidebar.info( """ diff --git a/frontend/components/login.py b/frontend/components/login.py index 6b400df..20b65c4 100644 --- a/frontend/components/login.py +++ b/frontend/components/login.py @@ -1,69 +1,53 @@ import json import requests - import streamlit as st def login(): - + """ + login function creates a user interface for Techdocs CLI. + """ base_url = 'https://caffeinecrew-techdocs.hf.space' - - - headers={"accept":"application/json"} - - tab1, tab2 = st.tabs(["Login", "Signup"]) - + headers = {'accept': 'application/json'} + tab1, tab2 = st.tabs(['Login', 'Signup']) with tab1: - with st.form(key="myform2"): - username = st.text_input(label="Username", label_visibility="collapsed", placeholder="Username") - password = st.text_input(label="Password", label_visibility="collapsed", placeholder="Password", type="password") - login_button = st.form_submit_button(label="Login") - - with st.spinner("Logging in..."): + with st.form(key='myform2'): + username = st.text_input(label='Username', label_visibility='collapsed', placeholder='Username') + password = st.text_input(label='Password', label_visibility='collapsed', placeholder='Password', type='password') + login_button = st.form_submit_button(label='Login') + with st.spinner('Logging in...'): if login_button: try: - credentials = {"username":username, "password":password} - response = requests.post(base_url + "/auth/login", headers=headers, data=json.dumps(credentials)) - if (response.status_code!=200): - raise Exception("Login Failed") - # res_dict.update(response.json()) - st.session_state["username"] = username - st.session_state["access_token"] = response.json()['access_token'] - st.session_state["refresh_token"] = response.json()['refresh_token'] - st.success("Logged in successfully") + credentials = {'username': username, 'password': password} + response = requests.post(base_url + '/auth/login', headers=headers, data=json.dumps(credentials)) + if response.status_code != 200: + raise Exception('Login Failed') + st.session_state['username'] = username + st.session_state['access_token'] = response.json()['access_token'] + st.session_state['refresh_token'] = response.json()['refresh_token'] + st.success('Logged in successfully') st.rerun() - except Exception as e: - if response.json() == "exception.InvalidCredentialsException()": - st.error("Please check your credentials") + if response.json() == 'exception.InvalidCredentialsException()': + st.error('Please check your credentials') else: - st.warning("Please verify your email before logging in") - + st.warning('Please verify your email before logging in') with tab2: - with st.form(key="myform1"): - username = st.text_input(label="Username", label_visibility="collapsed", placeholder="Username") - password = st.text_input(label="Password", label_visibility="collapsed", placeholder="Password", type="password") - email = st.text_input(label="Email", label_visibility="collapsed", placeholder="Email") - signup_button = st.form_submit_button(label="Signup") - - with st.spinner("Signing up..."): + with st.form(key='myform1'): + username = st.text_input(label='Username', label_visibility='collapsed', placeholder='Username') + password = st.text_input(label='Password', label_visibility='collapsed', placeholder='Password', type='password') + email = st.text_input(label='Email', label_visibility='collapsed', placeholder='Email') + signup_button = st.form_submit_button(label='Signup') + with st.spinner('Signing up...'): if signup_button: try: - credentials = {"username":username, "password":password, "email":email} - response = requests.post(url=base_url + "/auth/signup", headers=headers, data=json.dumps(credentials)) - if (response.status_code!=200): - raise Exception("Signup Failed") - - st.success(response.json()['message'][0].replace("\\n","\n")) + credentials = {'username': username, 'password': password, 'email': email} + response = requests.post(url=base_url + '/auth/signup', headers=headers, data=json.dumps(credentials)) + if response.status_code != 200: + raise Exception('Signup Failed') + st.success(response.json()['message'][0].replace('\\n', '\n')) except: - st.error("Account with this Username or Email already exists") - - st.divider() - st.subheader(":rainbow[Our Prototype in Action ]🎬") - with st.expander("Demo video 🎬",expanded=True): - st.video("frontend/images/Showcase.mp4") - - - - - - + st.error('Account with this Username or Email already exists') + st.divider() + st.subheader(':rainbow[Our Prototype in Action ]🎬') + with st.expander('Demo video 🎬', expanded=True): + st.video('frontend/images/Showcase.mp4') \ No newline at end of file diff --git a/frontend/components/logo.py b/frontend/components/logo.py index 4cd8ed0..3c32a57 100644 --- a/frontend/components/logo.py +++ b/frontend/components/logo.py @@ -1,22 +1,31 @@ import streamlit as st - import re import base64 import validators from pathlib import Path -def add_logo(logo_url: str, height: int = 100, svg=False): +def add_logo(logo_url: str, height: int=100, svg=False): + """ +Adds a logo to the sidebar navigation. + +:param logo_url: The URL of the logo file. +:type logo_url: str +:param height: The height of the logo in pixels. Default is 100. +:type height: int +:param svg: If True, the logo is an SVG file. Default is False. +:type svg: bool +:raises: None +:returns: None +""" if svg: svg_logo = read_svg(logo_url) - b64 = base64.b64encode(svg_logo.encode("utf-8")).decode("utf-8") + b64 = base64.b64encode(svg_logo.encode('utf-8')).decode('utf-8') logo = f'url("data:image/svg+xml;base64,{b64}")' - elif validators.url(logo_url): logo = f'url({logo_url})' else: logo = f'url("data:image/png;base64,{base64.b64encode(Path(logo_url).read_bytes()).decode()}")' - st.markdown( f"""