From f7a61e29f4948370758b7e41b01ae45cfc4d8516 Mon Sep 17 00:00:00 2001 From: Haseeb Date: Sun, 27 Oct 2024 03:19:24 +0400 Subject: [PATCH] Refactor code spaced to tabs --- libs/code_interpreter.py | 366 +++---- libs/gemini_vision.py | 124 +-- libs/history_manager.py | 190 ++-- libs/interpreter_lib.py | 1961 +++++++++++++++++++------------------- libs/logger.py | 102 +- libs/markdown_code.py | 112 +-- libs/package_manager.py | 382 ++++---- libs/utility_manager.py | 538 +++++------ 8 files changed, 1888 insertions(+), 1887 deletions(-) diff --git a/libs/code_interpreter.py b/libs/code_interpreter.py index 3a940f5..0002e40 100644 --- a/libs/code_interpreter.py +++ b/libs/code_interpreter.py @@ -16,198 +16,198 @@ class CodeInterpreter: - def __init__(self): - self.logger = Logger.initialize_logger("logs/code-interpreter.log") - - def _execute_script(self, script: str, shell: str): - stdout = stderr = None - try: - if shell == "bash": - process = subprocess.Popen(['bash', '-c', script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - elif shell == "powershell": - process = subprocess.Popen(['powershell', '-Command', script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - elif shell == "applescript": - process = subprocess.Popen(['osascript', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - else: - self.logger.error(f"Invalid shell selected: {shell}") - return None, f"Invalid shell selected: {shell}" - stdout, stderr = process.communicate() - self.logger.info(f"Output is {stdout.decode()} and error is {stderr.decode()}") - if process.returncode != 0: - self.logger.error(f"Error in running {shell} script: {stderr.decode()}") - except Exception as exception: - self.logger.error(f"Exception in running {shell} script: {str(exception)}") - stderr = str(exception) - finally: - return stdout.decode().strip() if stdout else None, stderr.decode().strip() if stderr else None - - def _check_compilers(self, language): - try: - language = language.lower().strip() - - compilers = { - "python": ["python", "--version"], - "javascript": ["node", "--version"], - } + def __init__(self): + self.logger = Logger.initialize_logger("logs/code-interpreter.log") + + def _execute_script(self, script: str, shell: str): + stdout = stderr = None + try: + if shell == "bash": + process = subprocess.Popen(['bash', '-c', script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + elif shell == "powershell": + process = subprocess.Popen(['powershell', '-Command', script], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + elif shell == "applescript": + process = subprocess.Popen(['osascript', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + self.logger.error(f"Invalid shell selected: {shell}") + return None, f"Invalid shell selected: {shell}" + stdout, stderr = process.communicate() + self.logger.info(f"Output is {stdout.decode()} and error is {stderr.decode()}") + if process.returncode != 0: + self.logger.error(f"Error in running {shell} script: {stderr.decode()}") + except Exception as exception: + self.logger.error(f"Exception in running {shell} script: {str(exception)}") + stderr = str(exception) + finally: + return stdout.decode().strip() if stdout else None, stderr.decode().strip() if stderr else None + + def _check_compilers(self, language): + try: + language = language.lower().strip() + + compilers = { + "python": ["python", "--version"], + "javascript": ["node", "--version"], + } - if language not in compilers: - self.logger.error("Invalid language selected.") - return False + if language not in compilers: + self.logger.error("Invalid language selected.") + return False - compiler = subprocess.run(compilers[language], capture_output=True, text=True) - if compiler.returncode != 0: - self.logger.error(f"{language.capitalize()} compiler not found.") - return False + compiler = subprocess.run(compilers[language], capture_output=True, text=True) + if compiler.returncode != 0: + self.logger.error(f"{language.capitalize()} compiler not found.") + return False - return True - except Exception as exception: - self.logger.error(f"Error occurred while checking compilers: {exception}") - raise Exception(f"Error occurred while checking compilers: {exception}") - - def save_code(self, filename='output/code_generated.py', code=None): - """ - Saves the provided code to a file. - The default filename is 'code_generated.py'. - """ - try: - # Check if the directory exists, if not create it - directory = os.path.dirname(filename) - if not os.path.exists(directory): - os.makedirs(directory) - - if not code: - self.logger.error("Code not provided.") - display_markdown_message("Error **Code not provided to save.**") - return + return True + except Exception as exception: + self.logger.error(f"Error occurred while checking compilers: {exception}") + raise Exception(f"Error occurred while checking compilers: {exception}") + + def save_code(self, filename='output/code_generated.py', code=None): + """ + Saves the provided code to a file. + The default filename is 'code_generated.py'. + """ + try: + # Check if the directory exists, if not create it + directory = os.path.dirname(filename) + if not os.path.exists(directory): + os.makedirs(directory) + + if not code: + self.logger.error("Code not provided.") + display_markdown_message("Error **Code not provided to save.**") + return - with open(filename, 'w') as file: - file.write(code) - self.logger.info(f"Code saved successfully to {filename}.") - except Exception as exception: - self.logger.error(f"Error occurred while saving code to file: {exception}") - raise Exception(f"Error occurred while saving code to file: {exception}") + with open(filename, 'w') as file: + file.write(code) + self.logger.info(f"Code saved successfully to {filename}.") + except Exception as exception: + self.logger.error(f"Error occurred while saving code to file: {exception}") + raise Exception(f"Error occurred while saving code to file: {exception}") - def extract_code(self, code:str, start_sep='```', end_sep='```',skip_first_line=False,code_mode=False): - """ - Extracts the code from the provided string. - If the string contains the start and end separators, it extracts the code between them. - Otherwise, it returns the original string. - """ - try: - has_newline = False - if start_sep in code and end_sep in code: - start = code.find(start_sep) + len(start_sep) - # Skip the newline character after the start separator - if code[start] == '\n': - start += 1 - has_newline = True - - end = code.find(end_sep, start) - # Skip the newline character before the end separator - if code[end - 1] == '\n': - end -= 1 - - if skip_first_line and code_mode and not has_newline: - # Skip the first line after the start separator - start = code.find('\n', start) + 1 - - extracted_code = code[start:end] - # Remove extra words for commands present. - if not code_mode and 'bash' in extracted_code: - extracted_code = extracted_code.replace('bash', '') - - self.logger.info("Code extracted successfully.") - return extracted_code - else: - self.logger.info("No special characters found in the code. Returning the original code.") - return code - except Exception as exception: - self.logger.error(f"Error occurred while extracting code: {exception}") - raise Exception(f"Error occurred while extracting code: {exception}") - - def execute_code(self, code, language): - try: - language = language.lower() - self.logger.info(f"Running code: {code[:100]} in language: {language}") + def extract_code(self, code:str, start_sep='```', end_sep='```',skip_first_line=False,code_mode=False): + """ + Extracts the code from the provided string. + If the string contains the start and end separators, it extracts the code between them. + Otherwise, it returns the original string. + """ + try: + has_newline = False + if start_sep in code and end_sep in code: + start = code.find(start_sep) + len(start_sep) + # Skip the newline character after the start separator + if code[start] == '\n': + start += 1 + has_newline = True + + end = code.find(end_sep, start) + # Skip the newline character before the end separator + if code[end - 1] == '\n': + end -= 1 + + if skip_first_line and code_mode and not has_newline: + # Skip the first line after the start separator + start = code.find('\n', start) + 1 + + extracted_code = code[start:end] + # Remove extra words for commands present. + if not code_mode and 'bash' in extracted_code: + extracted_code = extracted_code.replace('bash', '') + + self.logger.info("Code extracted successfully.") + return extracted_code + else: + self.logger.info("No special characters found in the code. Returning the original code.") + return code + except Exception as exception: + self.logger.error(f"Error occurred while extracting code: {exception}") + raise Exception(f"Error occurred while extracting code: {exception}") + + def execute_code(self, code, language): + try: + language = language.lower() + self.logger.info(f"Running code: {code[:100]} in language: {language}") - # Check for code and language validity - if not code or len(code.strip()) == 0: - return "Code is empty. Cannot execute an empty code." - - # Check for compilers on the system - compilers_status = self._check_compilers(language) - if not compilers_status: - raise Exception("Compilers not found. Please install compilers on your system.") - - if language == "python": - process = subprocess.Popen(["python", "-c", code], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - stdout_output = stdout.decode("utf-8") - stderr_output = stderr.decode("utf-8") - self.logger.info(f"Python Output execution: {stdout_output}, Errors: {stderr_output}") - return stdout_output, stderr_output - - elif language == "javascript": - process = subprocess.Popen(["node", "-e", code], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - stdout_output = stdout.decode("utf-8") - stderr_output = stderr.decode("utf-8") - self.logger.info(f"JavaScript Output execution: {stdout_output}, Errors: {stderr_output}") - return stdout_output, stderr_output - - else: - self.logger.info("Unsupported language.") - raise Exception("Unsupported language.") - - except Exception as exception: - self.logger.error(f"Exception in running code: {str(exception)}") - raise exception - - def execute_script(self, script:str, os_type:str='macos'): - output = error = None - try: - if not script: - raise ValueError("Script must be provided.") - if not os_type: - raise ValueError("OS type must be provided.") + # Check for code and language validity + if not code or len(code.strip()) == 0: + return "Code is empty. Cannot execute an empty code." + + # Check for compilers on the system + compilers_status = self._check_compilers(language) + if not compilers_status: + raise Exception("Compilers not found. Please install compilers on your system.") + + if language == "python": + process = subprocess.Popen(["python", "-c", code], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + stdout_output = stdout.decode("utf-8") + stderr_output = stderr.decode("utf-8") + self.logger.info(f"Python Output execution: {stdout_output}, Errors: {stderr_output}") + return stdout_output, stderr_output + + elif language == "javascript": + process = subprocess.Popen(["node", "-e", code], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + stdout_output = stdout.decode("utf-8") + stderr_output = stderr.decode("utf-8") + self.logger.info(f"JavaScript Output execution: {stdout_output}, Errors: {stderr_output}") + return stdout_output, stderr_output + + else: + self.logger.info("Unsupported language.") + raise Exception("Unsupported language.") + + except Exception as exception: + self.logger.error(f"Exception in running code: {str(exception)}") + raise exception + + def execute_script(self, script:str, os_type:str='macos'): + output = error = None + try: + if not script: + raise ValueError("Script must be provided.") + if not os_type: + raise ValueError("OS type must be provided.") - self.logger.info(f"Attempting to execute script: {script[:50]}") - if any(os in os_type.lower() for os in ['darwin', 'macos']): - output, error = self._execute_script(script, shell='applescript') - elif 'linux' in os_type.lower(): - output, error = self._execute_script(script, shell='bash') - elif 'windows' in os_type.lower(): - output, error = self._execute_script(script, shell='powershell') - else: - raise ValueError(f"Invalid OS type '{os_type}'. Please provide 'macos', 'linux', or 'windows'.") + self.logger.info(f"Attempting to execute script: {script[:50]}") + if any(os in os_type.lower() for os in ['darwin', 'macos']): + output, error = self._execute_script(script, shell='applescript') + elif 'linux' in os_type.lower(): + output, error = self._execute_script(script, shell='bash') + elif 'windows' in os_type.lower(): + output, error = self._execute_script(script, shell='powershell') + else: + raise ValueError(f"Invalid OS type '{os_type}'. Please provide 'macos', 'linux', or 'windows'.") - if output: - self.logger.info(f"Script executed successfully with output: {output[:50]}...") - if error: - self.logger.error(f"Script executed with error: {error}...") - except Exception as exception: - self.logger.error(f"Error in executing script: {traceback.format_exc()}") - error = str(exception) - finally: - return output, error - - def execute_command(self, command:str): - try: - if not command: - raise ValueError("Command must be provided.") + if output: + self.logger.info(f"Script executed successfully with output: {output[:50]}...") + if error: + self.logger.error(f"Script executed with error: {error}...") + except Exception as exception: + self.logger.error(f"Error in executing script: {traceback.format_exc()}") + error = str(exception) + finally: + return output, error + + def execute_command(self, command:str): + try: + if not command: + raise ValueError("Command must be provided.") - self.logger.info(f"Attempting to execute command: {command}") - process = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.logger.info(f"Attempting to execute command: {command}") + process = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout_output = process.stdout.decode("utf-8") - stderr_output = process.stderr.decode("utf-8") + stdout_output = process.stdout.decode("utf-8") + stderr_output = process.stderr.decode("utf-8") - if stdout_output: - self.logger.info(f"Command executed successfully with output: {stdout_output}") - if stderr_output: - self.logger.error(f"Command executed with error: {stderr_output}") + if stdout_output: + self.logger.info(f"Command executed successfully with output: {stdout_output}") + if stderr_output: + self.logger.error(f"Command executed with error: {stderr_output}") - return stdout_output, stderr_output - except Exception as exception: - self.logger.error(f"Error in executing command: {str(exception)}") - raise exception + return stdout_output, stderr_output + except Exception as exception: + self.logger.error(f"Error in executing command: {str(exception)}") + raise exception diff --git a/libs/gemini_vision.py b/libs/gemini_vision.py index 96ceabe..8105a6e 100644 --- a/libs/gemini_vision.py +++ b/libs/gemini_vision.py @@ -4,71 +4,71 @@ import litellm class GeminiVision: - def __init__(self, api_key=None) -> None: - self.logger = Logger.initialize_logger('logs/vision_interpreter.log') - self.logger.info(f"Initializing Gemini Vision") - self.api_key = api_key - - if self.api_key is None: - self.logger.warning("API key is not initialized") + def __init__(self, api_key=None) -> None: + self.logger = Logger.initialize_logger('logs/vision_interpreter.log') + self.logger.info("Initializing Gemini Vision") + self.api_key = api_key + + if self.api_key is None: + self.logger.warning("API key is not initialized") - # load the key from the .env file - load_dotenv() - api_key = os.getenv("GEMINI_API_KEY") - if not api_key: - self.logger.error("No API key found in the .env file") - raise ValueError("No API key found in the .env file") - - self.logger.info(f"Gemini Vision configured success") - self.logger.info(f"Model setup success") + # load the key from the .env file + load_dotenv() + api_key = os.getenv("GEMINI_API_KEY") + if not api_key: + self.logger.error("No API key found in the .env file") + raise ValueError("No API key found in the .env file") + + self.logger.info("Gemini Vision configured success") + self.logger.info("Model setup success") - def generate_text(self, prompt, image_url): - self.logger.info(f"Generating contents") - - # Create the messages payload according to the documentation - messages = [ - { - "role": "user", - "content": [ - { - "type": "text", - "text": prompt - }, - { - "type": "image_url", - "image_url": {"url": image_url} - } - ] - } - ] + def generate_text(self, prompt, image_url): + self.logger.info("Generating contents") + + # Create the messages payload according to the documentation + messages = [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": prompt + }, + { + "type": "image_url", + "image_url": {"url": image_url} + } + ] + } + ] - # Make the API call to Gemini model - response = litellm.completion( - model="gemini/gemini-pro-vision", - messages=messages, - ) + # Make the API call to Gemini model + response = litellm.completion( + model="gemini/gemini-pro-vision", + messages=messages, + ) - # Extract the response content - return response.get('choices', [{}])[0].get('message', {}).get('content') + # Extract the response content + return response.get('choices', [{}])[0].get('message', {}).get('content') - def gemini_vision_url(self, prompt, image_url): - self.logger.info(f"Generating text from URL: {image_url}") - try: - return self.generate_text(prompt, image_url) - except Exception as exception: - self.logger.error(f"Error generating text from URL: {exception}") - raise + def gemini_vision_url(self, prompt, image_url): + self.logger.info(f"Generating text from URL: {image_url}") + try: + return self.generate_text(prompt, image_url) + except Exception as exception: + self.logger.error(f"Error generating text from URL: {exception}") + raise - def gemini_vision_path(self, prompt, image_path): - self.logger.info(f"Generating text from image path: '{image_path}'") - try: - self.logger.info(f"Checking if image path exists for: '{image_path}'") - - # check if the image path exists - if not os.path.exists(image_path): - raise ValueError(f"Image path does not exist: {image_path}") - - return self.generate_text(prompt,image_path) - except Exception as exception: - self.logger.error(f"Error generating text from image path: {exception}") - raise \ No newline at end of file + def gemini_vision_path(self, prompt, image_path): + self.logger.info(f"Generating text from image path: '{image_path}'") + try: + self.logger.info(f"Checking if image path exists for: '{image_path}'") + + # check if the image path exists + if not os.path.exists(image_path): + raise ValueError(f"Image path does not exist: {image_path}") + + return self.generate_text(prompt,image_path) + except Exception as exception: + self.logger.error(f"Error generating text from image path: {exception}") + raise \ No newline at end of file diff --git a/libs/history_manager.py b/libs/history_manager.py index 1c69eba..55db0ed 100644 --- a/libs/history_manager.py +++ b/libs/history_manager.py @@ -5,108 +5,108 @@ from libs.logger import Logger class History: - def __init__(self, history_file: str): - self.history_file = history_file - self.logger = Logger.initialize_logger("logs/interpreter.log") + def __init__(self, history_file: str): + self.history_file = history_file + self.logger = Logger.initialize_logger("logs/interpreter.log") - def save_history_json(self, task, mode, os_name, language, prompt, code_snippet, code_output, model_name): - try: - history_entry = { - "assistant": { - "task": task, - "mode": mode, - "os": os_name, - "language": language, - "model": model_name - }, - "user": prompt, - "system": { - "code": code_snippet, - "output": code_output - } - } + def save_history_json(self, task, mode, os_name, language, prompt, code_snippet, code_output, model_name): + try: + history_entry = { + "assistant": { + "task": task, + "mode": mode, + "os": os_name, + "language": language, + "model": model_name + }, + "user": prompt, + "system": { + "code": code_snippet, + "output": code_output + } + } - data = [] - if os.path.isfile(self.history_file) and os.path.getsize(self.history_file) > 0: - with open(self.history_file, "r") as history_file: # Open the file in read mode - data = json.load(history_file) + data = [] + if os.path.isfile(self.history_file) and os.path.getsize(self.history_file) > 0: + with open(self.history_file, "r") as history_file: # Open the file in read mode + data = json.load(history_file) - data.append(history_entry) + data.append(history_entry) - with open(self.history_file, "w") as history_file: - json.dump(data, history_file) - except Exception as exception: - self.logger.error(f"Error in saving history to JSON: {str(exception)}") - raise + with open(self.history_file, "w") as history_file: + json.dump(data, history_file) + except Exception as exception: + self.logger.error(f"Error in saving history to JSON: {str(exception)}") + raise - def _get_data_for_key(self, key: str) -> List[Any]: - """Returns a list of all values for the specified key in the history data.""" - try: - if os.path.getsize(self.history_file) > 0: - with open(self.history_file, 'r') as file: - history_data = json.load(file) - else: - return [] - - specific_data = [] - for entry in history_data: - if key in entry['assistant']: - specific_data.append(entry['assistant'].get(key)) - elif key in entry['system']: - specific_data.append(entry['system'].get(key)) - self.logger.info(f'Successfully retrieved {key} data from history') - return specific_data - except Exception as exception: - self.logger.error(f'Error getting {key} data from history: {exception}') - raise + def _get_data_for_key(self, key: str) -> List[Any]: + """Returns a list of all values for the specified key in the history data.""" + try: + if os.path.getsize(self.history_file) > 0: + with open(self.history_file, 'r') as file: + history_data = json.load(file) + else: + return [] + + specific_data = [] + for entry in history_data: + if key in entry['assistant']: + specific_data.append(entry['assistant'].get(key)) + elif key in entry['system']: + specific_data.append(entry['system'].get(key)) + self.logger.info(f'Successfully retrieved {key} data from history') + return specific_data + except Exception as exception: + self.logger.error(f'Error getting {key} data from history: {exception}') + raise - def _get_last_entries(self, count: int) -> List[dict]: - """Returns the last n entries from the history data.""" - try: - with open(self.history_file, 'r') as file: - history_data = json.load(file) - last_entries = history_data[-count:] - self.logger.info(f'Successfully retrieved last {count} entries from history') - return last_entries - except Exception as exception: - self.logger.error(f'Error getting last {count} entries from history: {exception}') - raise + def _get_last_entries(self, count: int) -> List[dict]: + """Returns the last n entries from the history data.""" + try: + with open(self.history_file, 'r') as file: + history_data = json.load(file) + last_entries = history_data[-count:] + self.logger.info(f'Successfully retrieved last {count} entries from history') + return last_entries + except Exception as exception: + self.logger.error(f'Error getting last {count} entries from history: {exception}') + raise - def _get_last_entries_for_key(self, key: str, count: int) -> List[Any]: - """Returns the values of the specified key for the last n entries in the history data.""" - try: - specific_key_data = self._get_data_for_key(key) - last_specific_data = specific_key_data[-count:] - if last_specific_data: - self.logger.info(f'Successfully retrieved {key} data for last {count} entries from history') - self.logger.info(f"\n'{last_specific_data}'\n") - return last_specific_data - else: - self.logger.info(f'No {key} data found in history') - return [] - except Exception as exception: - self.logger.error(f'Error getting {key} data for last {count} entries from history: {exception}') - raise - - def _get_last_entries_for_keys(self, count: int, *keys: str) -> List[dict]: - last_entries = [] - try: - entries = {key: self._get_last_entries_for_key(key, count) for key in keys} - - for index in range(count): - session = {key: entries[key][index] if index < len(entries[key]) else None for key in keys} - last_entries.append(session) - - return last_entries - except Exception as exception: - self.logger.error(f'Error getting last {count} entries for keys {keys} from history: {exception}') - raise + def _get_last_entries_for_key(self, key: str, count: int) -> List[Any]: + """Returns the values of the specified key for the last n entries in the history data.""" + try: + specific_key_data = self._get_data_for_key(key) + last_specific_data = specific_key_data[-count:] + if last_specific_data: + self.logger.info(f'Successfully retrieved {key} data for last {count} entries from history') + self.logger.info(f"\n'{last_specific_data}'\n") + return last_specific_data + else: + self.logger.info(f'No {key} data found in history') + return [] + except Exception as exception: + self.logger.error(f'Error getting {key} data for last {count} entries from history: {exception}') + raise + + def _get_last_entries_for_keys(self, count: int, *keys: str) -> List[dict]: + last_entries = [] + try: + entries = {key: self._get_last_entries_for_key(key, count) for key in keys} + + for index in range(count): + session = {key: entries[key][index] if index < len(entries[key]) else None for key in keys} + last_entries.append(session) + + return last_entries + except Exception as exception: + self.logger.error(f'Error getting last {count} entries for keys {keys} from history: {exception}') + raise - def get_chat_history(self, count: int) -> List[dict]: - return self._get_last_entries_for_keys(count, "task", "output") + def get_chat_history(self, count: int) -> List[dict]: + return self._get_last_entries_for_keys(count, "task", "output") - def get_code_history(self, count: int) -> List[dict]: - return self._get_last_entries_for_keys(count, "code", "output") + def get_code_history(self, count: int) -> List[dict]: + return self._get_last_entries_for_keys(count, "code", "output") - def get_full_history(self, count: int) -> List[dict]: - return self._get_last_entries_for_keys(count, "task", "code", "output") \ No newline at end of file + def get_full_history(self, count: int) -> List[dict]: + return self._get_last_entries_for_keys(count, "task", "code", "output") \ No newline at end of file diff --git a/libs/interpreter_lib.py b/libs/interpreter_lib.py index c290c57..d958532 100644 --- a/libs/interpreter_lib.py +++ b/libs/interpreter_lib.py @@ -26,985 +26,986 @@ import shlex class Interpreter: - logger = None - client = None - interpreter_version = None - - def __init__(self, args): - self.args = args - self.history = [] - self.history_count = 3 - self.history_file = "history/history.json" - self.utility_manager = UtilityManager() - self.code_interpreter = CodeInterpreter() - self.package_manager = PackageManager() - self.history_manager = History(self.history_file) - self.logger = Logger.initialize_logger("logs/interpreter.log") - self.client = None - self.config_values = None - self.system_message = "" - self.gemini_vision = None - self.initialize() - - def initialize(self): - self.INTERPRETER_LANGUAGE = self.args.lang if self.args.lang else 'python' - self.SAVE_CODE = self.args.save_code - self.EXECUTE_CODE = self.args.exec - self.DISPLAY_CODE = self.args.display_code - self.INTERPRETER_MODEL = self.args.model if self.args.model else None - self.logger.info(f"Interpreter args model selected is '{self.args.model}") - self.logger.info(f"Interpreter model selected is '{self.INTERPRETER_MODEL}'") - self.system_message = "" - self.INTERPRETER_MODE = 'code' - - if self.args.file is None: - self.INTERPRETER_PROMPT_FILE = False - self.INTERPRETER_PROMPT_INPUT = True - else: - self.INTERPRETER_PROMPT_FILE = True - self.INTERPRETER_PROMPT_INPUT = False - # If the user didn't provide a file name, use the default one - if self.args.file == '': - self.args.file = 'prompt.txt' - - # Set the history optional(Argparse) - if hasattr(self.args, 'history'): - self.INTERPRETER_HISTORY = self.args.history - else: - self.INTERPRETER_HISTORY = False - - if self.INTERPRETER_MODE == 'vision': - self.system_message = "You are top tier image captioner and image analyzer. Please generate a well-written description of the image that is precise, easy to understand" - elif self.INTERPRETER_MODE == 'chat': - self.system_message = "You are top tier chatbot. Please generate a well-written response that is precise, easy to understand" - else: - # Open file system_message.txt to a variable system_message - try: - with open('system/system_message.txt', 'r') as file: - self.system_message = file.read() - if self.system_message != "": - self.logger.info(f"System message read successfully") - except Exception as exception: - self.logger.error(f"Error occurred while reading system_message.txt: {str(exception)}") - raise - - # Initialize client and mode. - self.initialize_client() - self.initialize_mode() - - try: # Make this as optional step to have readline history. - self.utility_manager.initialize_readline_history() - except: - self.logger.error(f"Exception on initializing readline history") - - def initialize_client(self): - load_dotenv() - self.logger.info("Initializing Client") - - self.logger.info(f"Interpreter model selected is '{self.INTERPRETER_MODEL}'") - if self.INTERPRETER_MODEL is None or self.INTERPRETER_MODEL == "": - self.logger.info("HF_MODEL is not provided, using default model.") - config_file_name = f"configs/gpt-4o.config" # Setting default model to GPT 4o. - else: - config_file_name = f"configs/{self.INTERPRETER_MODEL}.config" - - self.logger.info(f"Reading config file {config_file_name}") - self.config_values = self.utility_manager.read_config_file(config_file_name) - self.INTERPRETER_MODEL = str(self.config_values.get('HF_MODEL', self.INTERPRETER_MODEL)) - hf_model_name = self.INTERPRETER_MODEL.strip().split("/")[-1] - - # skip init client for local models.(Bug#10 https://github.com/haseeb-heaven/code-interpreter/issues/10) - if 'local' in self.INTERPRETER_MODEL: - self.logger.info(f"Skipping client initialization for local model.") - # Add OpenAI API key if not present in the environment variables. (https://github.com/haseeb-heaven/code-interpreter/issues/13) - # Fixed OpenAI API Key name (https://github.com/haseeb-heaven/code-interpreter/issues/20) - api_key = os.environ['OPENAI_API_KEY'] - - if api_key: - self.logger.info(f"Using local API key from environment variables.") - - if api_key is None: - load_dotenv(dotenv_path=os.path.join(os.getcwd(), ".env")) - api_key = os.getenv('OPENAI_API_KEY') - if api_key is None: - self.logger.info(f"Setting default local API key for local models.") - os.environ['OPENAI_API_KEY'] = "sk-1234567890" # Setting default API key for local models. - return - - self.logger.info(f"Using model {hf_model_name}") - - model_api_keys = { - "gpt": {"key_name": "OPENAI_API_KEY", "prefix": "sk-"}, - "groq": {"key_name": "GROQ_API_KEY", "prefix": "gsk"}, - "claude": {"key_name": "ANTHROPIC_API_KEY", "prefix": "sk-ant-"}, - "palm": {"key_name": "PALM_API_KEY", "prefix": None, "length": 15}, - "gemini": {"key_name": "GEMINI_API_KEY", "prefix": None, "length": 15}, - "default": {"key_name": "HUGGINGFACE_API_KEY", "prefix": "hf_"} - } - - for model, api_key_info in model_api_keys.items(): - if model in self.INTERPRETER_MODEL or model == "default": - api_key_name = api_key_info["key_name"] - api_key = os.getenv(api_key_name) - if api_key is None: - load_dotenv(dotenv_path=os.path.join(os.getcwd(), ".env")) - api_key = os.getenv(api_key_name) - if not api_key: - raise Exception(f"{api_key_name} not found in .env file.") - if api_key_info["prefix"] and not api_key.startswith(api_key_info["prefix"]): - raise Exception(f"{api_key_name} should start with '{api_key_info['prefix']}'. Please check your .env file.") - if api_key_info.get("length") and len(api_key) <= api_key_info["length"]: - raise Exception(f"{api_key_name} should have length greater than {api_key_info['length']}. Please check your .env file.") - break - - def initialize_mode(self): - self.CODE_MODE = True if self.args.mode == 'code' else False - self.SCRIPT_MODE = True if self.args.mode == 'script' else False - self.COMMAND_MODE = True if self.args.mode == 'command' else False - self.VISION_MODE = True if self.args.mode == 'vision' else False - self.CHAT_MODE = True if self.args.mode == 'chat' else False - if not self.SCRIPT_MODE and not self.COMMAND_MODE and not self.VISION_MODE and not self.CHAT_MODE: - self.CODE_MODE = True - - def get_prompt(self,message: str, chat_history: List[dict]) -> str: - system_message = None - - if self.CODE_MODE: - system_message = self.system_message - elif self.SCRIPT_MODE: - system_message = "Please generate a well-written script that is precise, easy to understand, and compatible with the current operating system." - elif self.COMMAND_MODE: - system_message = "Please generate a single line command that is precise, easy to understand, and compatible with the current operating system." - elif self.VISION_MODE: - system_message = "Please generate a well-written description of the image that is precise, easy to understand" - return system_message - elif self.CHAT_MODE: - system_message = "Please generate a well-written response that is precise, easy to understand" - - # Add the chat history to the prompt - if chat_history or len(chat_history) > 0: - system_message += "\n\n" + "\n\n" + "This is user chat history for this task and make sure to use this as reference to generate the answer if user asks for 'History' or 'Chat History'.\n\n" + "\n\n" + str(chat_history) + "\n\n" - - # Use the Messages API from Anthropic. - if 'claude-3' in self.INTERPRETER_MODEL: - messages=[ - { - "role": "user", - "content": [ - { - "type": "text", - "text": message - } - ] - } - ] - - # Use the Assistants API. - else: - messages = [ - {"role": "system", "content": system_message}, - {"role": "assistant", "content": "Please generate code wrapped inside triple backticks known as codeblock."}, - {"role": "user", "content": message} - ] - - return messages - - def execute_last_code(self,os_name): - try: - code_file,code_snippet = self.utility_manager.get_code_history(self.INTERPRETER_LANGUAGE) - - # check if the code is empty - if code_snippet is None: - self.logger.error("Code history is empty.") - print("Code history is empty. - Please use -s or --save_code to save the code.") - return - - display_code(code_snippet) - # Execute the code if the user has selected. - code_output, code_error = self.execute_code(code_snippet, os_name) - if code_output: - self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed successfully.") - display_code(code_output) - self.logger.info(f"Output: {code_output[:100]}") - elif code_error: - self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed with error.") - display_markdown_message(f"Error: {code_error}") - except Exception as exception: - self.logger.error(f"Error in processing command run code: {str(exception)}") - raise - - def generate_content(self,message, chat_history: list[tuple[str, str]], temperature=0.1, max_tokens=1024,config_values=None,image_file=None): - self.logger.info(f"Generating content with args: message={message}, chat_history={chat_history}, temperature={temperature}, max_tokens={max_tokens}, config_values={config_values}, image_file={image_file}") - self.logger.info(f"Interpreter model selected is '{self.INTERPRETER_MODEL}'") - - # Use the values from the config file if they are provided - if config_values: - temperature = float(config_values.get('temperature', temperature)) - max_tokens = int(config_values.get('max_tokens', max_tokens)) - api_base = str(config_values.get('api_base', None)) # Only for OpenAI. - - # Get the system prompt - messages = self.get_prompt(message, chat_history) - - # Check if the model is GPT 3.5/4/4o - if 'gpt' in self.INTERPRETER_MODEL: - self.logger.info("Model is GPT 3.5/4/4o") - if api_base != 'None': - # Set the custom language model provider - custom_llm_provider = "openai" - self.logger.info(f"Custom API mode selected for OpenAI, api_base={api_base}") - response = litellm.completion(self.INTERPRETER_MODEL, messages=messages, temperature=temperature, max_tokens=max_tokens, api_base=api_base, custom_llm_provider=custom_llm_provider) - else: - self.logger.info(f"Default API mode selected for OpenAI.") - response = litellm.completion(self.INTERPRETER_MODEL, messages=messages, temperature=temperature, max_tokens=max_tokens) - self.logger.info("Response received from completion function.") - - # Check if the model is PALM-2 - elif 'palm' in self.INTERPRETER_MODEL: - self.logger.info("Model is PALM-2.") - self.INTERPRETER_MODEL = "palm/chat-bison" - response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature,max_tokens=max_tokens) - self.logger.info("Response received from completion function.") - - # Check if the model is Gemini Pro - elif 'gemini' in self.INTERPRETER_MODEL: - - if self.INTERPRETER_MODE == 'vision': - # Import Gemini Vision only if the model is Gemini Pro Vision. - try: - from libs.gemini_vision import GeminiVision - self.gemini_vision = GeminiVision() - except Exception as exception: - self.logger.error(f"Error importing Gemini Vision: {exception}") - raise - - self.logger.info("Model is Gemini Pro Vision.") - response = None - - # Check if image_file is valid. - if not image_file: - self.logger.error("Image file is not valid or Corrupted.") - raise ValueError("Image file is not valid or Corrupted.") - - # Check if image contains URL. - if 'http' in image_file or 'https' in image_file or 'www.' in image_file: - self.logger.info("Image contains URL.") - response = self.gemini_vision.gemini_vision_url(prompt=messages,image_url=image_file) - else: - self.logger.info("Image contains file.") - response = self.gemini_vision.gemini_vision_path(prompt=messages,image_path=image_file) - - self.logger.info("Response received from completion function.") - return response # Return the response from Gemini Vision because its not coding model. - else: - self.logger.info("Model is Gemini Pro.") - self.INTERPRETER_MODEL = "gemini/gemini-pro" - response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature) - self.logger.info("Response received from completion function.") - - # Check if the model is Groq-AI - elif 'groq' in self.INTERPRETER_MODEL: - - if 'groq-llama2' in self.INTERPRETER_MODEL: - self.logger.info("Model is Groq/Llama2.") - self.INTERPRETER_MODEL = "groq/llama2-70b-4096" - elif 'groq-mixtral' in self.INTERPRETER_MODEL: - self.logger.info("Model is Groq/Mixtral.") - self.INTERPRETER_MODEL = "groq/mixtral-8x7b-32768" - elif 'groq-gemma' in self.INTERPRETER_MODEL: - self.logger.info("Model is Groq/Gemma.") - self.INTERPRETER_MODEL = "groq/gemma-7b-it" - - response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature,max_tokens=max_tokens) - self.logger.info("Response received from completion function.") - - # Check if the model is AnthropicAI - elif 'claude' in self.INTERPRETER_MODEL: - - if 'claude-2' in self.INTERPRETER_MODEL: - self.logger.info("Model is Claude-2.") - self.INTERPRETER_MODEL = "claude-2" - elif 'claude-2.1' in self.INTERPRETER_MODEL: - self.logger.info("Model is claude-2.1.") - self.INTERPRETER_MODEL = "claude-2.1" - - # Support for Claude-3 Models - elif 'claude-3' in self.INTERPRETER_MODEL: - - if 'claude-3-sonnet' in self.INTERPRETER_MODEL: - self.logger.info("Model is claude-3-sonnet.") - self.INTERPRETER_MODEL = "claude-3-sonnet-20240229" - - elif 'claude-3-opus' in self.INTERPRETER_MODEL: - self.logger.info("Model is claude-3-opus.") - self.INTERPRETER_MODEL = "claude-3-opus-20240229" - - response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature,max_tokens=max_tokens) - self.logger.info("Response received from completion function.") - - # Check if the model is Local Model - elif 'local' in self.INTERPRETER_MODEL: - self.logger.info("Model is Local model") - if api_base != 'None': - # Set the custom language model provider - custom_llm_provider = "openai" - self.logger.info(f"Custom API mode selected for Local Model, api_base={api_base}") - response = litellm.completion(self.INTERPRETER_MODEL, messages=messages, temperature=temperature, max_tokens=max_tokens, api_base=api_base, custom_llm_provider=custom_llm_provider) - else: - raise Exception("Exception api base not set for custom model") - self.logger.info("Response received from completion function.") - - - # Check if model are from Hugging Face. - else: - # Add huggingface/ if not present in the model name. - if 'huggingface/' not in self.INTERPRETER_MODEL: - self.INTERPRETER_MODEL = 'huggingface/' + self.INTERPRETER_MODEL - - self.logger.info(f"Model is from Hugging Face. {self.INTERPRETER_MODEL}") - response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature,max_tokens=max_tokens) - self.logger.info("Response received from completion function.") - - self.logger.info(f"Generated text {response}") - generated_text = self.utility_manager._extract_content(response) - self.logger.info(f"Generated content {generated_text}") - return generated_text - - def get_code_prompt(self, task, os_name): - - if self.INTERPRETER_LANGUAGE not in ['python', 'javascript']: - self.INTERPRETER_LANGUAGE = 'python' - - prompt = ( - f"Generate the {self.INTERPRETER_LANGUAGE} code for the following task: '{task}'.\n" - f"Ensure the code is well-structured, easy to read, and follows best practices for {self.INTERPRETER_LANGUAGE}.\n" - f"The code should be compatible with the operating system: {os_name}, considering any platform-specific features or limitations.\n" - "Handle potential errors gracefully, and ensure the solution is efficient and concise.\n" - "If there are multiple possible solutions, choose the most optimized one." - ) - return prompt - - def get_script_prompt(self, task, os_name): - os_name_lower = os_name.lower() - - # Combined dictionary for both language mapping and script type - language_map = { - 'darwin': ('applescript', 'AppleScript'), - 'linux': ('bash', 'Bash Shell script'), - 'windows': ('powershell', 'Powershell script') - } - - # Find matching language and script type or default to Python - self.INTERPRETER_LANGUAGE, script_type = next( - (lang, stype) for key, (lang, stype) in language_map.items() if key in os_name_lower - ) if any(key in os_name_lower for key in language_map) else ('python', 'script') - - prompt = ( - f"Generate only the {script_type} for this task:\n" - f"Task: '{task}'\n" - f"Operating System: {os_name}\n" - "NOTE: Ensure the script is compatible with the specified OS and version.\n" - "Output should only contain the script, with no additional text." - ) - - self.logger.info(f"Script Prompt: {prompt}") - return prompt - - def get_command_prompt(self, task, os_name): - prompt = ( - f"Generate only the single terminal command for this task:\n" - f"Task: '{task}'\n" - f"Operating System: {os_name}\n" - "NOTE: Ensure the command is compatible with the specified OS and version.\n" - "Output should only contain the command, with no additional text." - ) - self.logger.info(f"Command Prompt: {prompt}") - return prompt - - def handle_vision_mode(self, task): - prompt = f"Give accurate and detailed information about the image provided and be very detailed about the image '{task}'." - return prompt - - def handle_chat_mode(self, task): - prompt = f"Give accurate and detailed response to the question provided and be very detailed about the question '{task}'." - return prompt - - def get_mode_prompt(self, task, os_name): - if self.CODE_MODE: - self.logger.info("Getting code prompt.") - return self.get_code_prompt(task, os_name) - elif self.SCRIPT_MODE: - self.logger.info("Getting script prompt.") - return self.get_script_prompt(task, os_name) - elif self.COMMAND_MODE: - self.logger.info("Getting command prompt.") - return self.get_command_prompt(task, os_name) - elif self.VISION_MODE: - self.logger.info("Getting vision prompt.") - return self.handle_vision_mode(task) - elif self.CHAT_MODE: - self.logger.info("Getting chat prompt.") - return self.handle_chat_mode(task) - - def execute_code(self, extracted_code, os_name): - # If the interpreter mode is Vision, do not execute the code. - if self.INTERPRETER_MODE in ['vision','chat']: - return None, None - - execute = 'y' if self.EXECUTE_CODE else input("Execute the code? (Y/N): ") - if execute.lower() == 'y': - try: - code_output, code_error = "", "" - if self.SCRIPT_MODE: - code_output, code_error = self.code_interpreter.execute_script(extracted_code, os_type=os_name) - elif self.COMMAND_MODE: - code_output, code_error = self.code_interpreter.execute_command(extracted_code) - elif self.CODE_MODE: - code_output, code_error = self.code_interpreter.execute_code(extracted_code, language=self.INTERPRETER_LANGUAGE) - return code_output, code_error - except Exception as exception: - self.logger.error(f"Error occurred while executing code: {str(exception)}") - return None, str(exception) # Return error message as second element of tuple - else: - return None, None # Return None, None if user chooses not to execute the code - - def interpreter_main(self,version): - - self.interpreter_version = version - self.logger.info(f"Interpreter - v{self.interpreter_version}") - - os_platform = self.utility_manager.get_os_platform() - os_name = os_platform[0] - generated_output = None - code_snippet = None - code_output, code_error = None, None - extracted_file_name = None - - # Seting the mode. - if self.SCRIPT_MODE: - self.INTERPRETER_MODE = 'script' - elif self.COMMAND_MODE: - self.INTERPRETER_MODE = 'command' - elif self.VISION_MODE: - self.INTERPRETER_MODE = 'vision' - elif self.CHAT_MODE: - self.INTERPRETER_MODE = 'chat' - - start_sep = str(self.config_values.get('start_sep', '```')) - end_sep = str(self.config_values.get('end_sep', '```')) - skip_first_line = self.config_values.get('skip_first_line', 'False') == 'True' - - self.logger.info(f"Mode: {self.INTERPRETER_MODE} Start separator: {start_sep}, End separator: {end_sep}, Skip first line: {skip_first_line}") - - # Display system and Assistant information. - input_prompt_mode = "File" if self.INTERPRETER_PROMPT_FILE else "Input" - display_code(f"OS: '{os_name}', Language: '{self.INTERPRETER_LANGUAGE}', Mode: '{self.INTERPRETER_MODE}', Prompt: '{input_prompt_mode}', Model: '{self.INTERPRETER_MODEL}'") - - # Display the welcome message. - display_markdown_message("Welcome to the **Interpreter**, I'm here to **assist** you with your everyday tasks. " - "\nEnter your task and I'll do my best to help you out.") - - # Main System and Assistant loop. - running = True - while running: - try: - task = None - - if self.INTERPRETER_PROMPT_INPUT: - self.logger.info(f"Reading prompt from input.") - # Main input prompt - System and Assistant. - task = input("> ") - elif self.INTERPRETER_PROMPT_FILE: - prompt_file_name = self.args.file - - # Setting the prompt file path. - if not prompt_file_name: - prompt_file_name = 'prompt.txt' - - prompt_file_path = os.path.join(os.getcwd(),'system',prompt_file_name) - - # check if the file exists. - if not os.path.exists(prompt_file_path): - self.logger.error(f"Prompt file not found: {prompt_file_path}") - user_confirmation = input(f"Create a new prompt file (Y/N)?: ") - if user_confirmation.lower() == 'y': - self.logger.info(f"Creating new prompt file.") - self.utility_manager.create_file(prompt_file_path) - display_markdown_message(f"New prompt file created **successfully** ") - else: - self.logger.info(f"User declined to create new prompt file.") - display_markdown_message(f"User declined to create new prompt file.\nSwitching to input mode.") - # Switch to input mode. - self.INTERPRETER_PROMPT_INPUT = True - self.INTERPRETER_PROMPT_FILE = False - - continue - continue - - display_markdown_message(f"\nEnter your task in the file **'{prompt_file_path}'**") - - # File mode command section. - prompt_confirmation = input(f"Execute the prompt (Y/N/P/C) (P = Prompt Mode,C = Command Mode)?: ") - if prompt_confirmation.lower() == 'y': - self.logger.info(f"Executing prompt from file.") + logger = None + client = None + interpreter_version = None + + def __init__(self, args): + self.args = args + self.history = [] + self.history_count = 3 + self.history_file = "history/history.json" + self.utility_manager = UtilityManager() + self.code_interpreter = CodeInterpreter() + self.package_manager = PackageManager() + self.history_manager = History(self.history_file) + self.logger = Logger.initialize_logger("logs/interpreter.log") + self.client = None + self.config_values = None + self.system_message = "" + self.gemini_vision = None + self.initialize() + + def initialize(self): + self.INTERPRETER_LANGUAGE = self.args.lang if self.args.lang else 'python' + self.SAVE_CODE = self.args.save_code + self.EXECUTE_CODE = self.args.exec + self.DISPLAY_CODE = self.args.display_code + self.INTERPRETER_MODEL = self.args.model if self.args.model else None + self.logger.info(f"Interpreter args model selected is '{self.args.model}") + self.logger.info(f"Interpreter model selected is '{self.INTERPRETER_MODEL}'") + self.system_message = "" + self.INTERPRETER_MODE = 'code' + + if self.args.file is None: + self.INTERPRETER_PROMPT_FILE = False + self.INTERPRETER_PROMPT_INPUT = True + else: + self.INTERPRETER_PROMPT_FILE = True + self.INTERPRETER_PROMPT_INPUT = False + # If the user didn't provide a file name, use the default one + if self.args.file == '': + self.args.file = 'prompt.txt' + + # Set the history optional(Argparse) + if hasattr(self.args, 'history'): + self.INTERPRETER_HISTORY = self.args.history + else: + self.INTERPRETER_HISTORY = False + + if self.INTERPRETER_MODE == 'vision': + self.system_message = "You are top tier image captioner and image analyzer. Please generate a well-written description of the image that is precise, easy to understand" + elif self.INTERPRETER_MODE == 'chat': + self.system_message = "You are top tier chatbot. Please generate a well-written response that is precise, easy to understand" + else: + # Open file system_message.txt to a variable system_message + try: + with open('system/system_message.txt', 'r') as file: + self.system_message = file.read() + if self.system_message != "": + self.logger.info(f"System message read successfully") + except Exception as exception: + self.logger.error(f"Error occurred while reading system_message.txt: {str(exception)}") + raise + + # Initialize client and mode. + self.initialize_client() + self.initialize_mode() + + try: # Make this as optional step to have readline history. + self.utility_manager.initialize_readline_history() + except: + self.logger.error(f"Exception on initializing readline history") + + def initialize_client(self): + load_dotenv() + self.logger.info("Initializing Client") + + self.logger.info(f"Interpreter model selected is '{self.INTERPRETER_MODEL}'") + if self.INTERPRETER_MODEL is None or self.INTERPRETER_MODEL == "": + self.logger.info("HF_MODEL is not provided, using default model.") + config_file_name = f"configs/gpt-4o.config" # Setting default model to GPT 4o. + else: + config_file_name = f"configs/{self.INTERPRETER_MODEL}.config" + + self.logger.info(f"Reading config file {config_file_name}") + self.config_values = self.utility_manager.read_config_file(config_file_name) + self.INTERPRETER_MODEL = str(self.config_values.get('HF_MODEL', self.INTERPRETER_MODEL)) + hf_model_name = self.INTERPRETER_MODEL.strip().split("/")[-1] + + # skip init client for local models.(Bug#10 https://github.com/haseeb-heaven/code-interpreter/issues/10) + if 'local' in self.INTERPRETER_MODEL: + self.logger.info("Skipping client initialization for local model.") + # Add OpenAI API key if not present in the environment variables. (https://github.com/haseeb-heaven/code-interpreter/issues/13) + # Fixed OpenAI API Key name (https://github.com/haseeb-heaven/code-interpreter/issues/20) + api_key = os.environ['OPENAI_API_KEY'] + + if api_key: + self.logger.info("Using local API key from environment variables.") + + if api_key is None: + load_dotenv(dotenv_path=os.path.join(os.getcwd(), ".env")) + api_key = os.getenv('OPENAI_API_KEY') + if api_key is None: + self.logger.info("Setting default local API key for local models.") + os.environ['OPENAI_API_KEY'] = "sk-1234567890" # Setting default API key for local models. + return + + self.logger.info(f"Using model {hf_model_name}") + + model_api_keys = { + "gpt": {"key_name": "OPENAI_API_KEY", "prefix": "sk-"}, + "groq": {"key_name": "GROQ_API_KEY", "prefix": "gsk"}, + "claude": {"key_name": "ANTHROPIC_API_KEY", "prefix": "sk-ant-"}, + "palm": {"key_name": "PALM_API_KEY", "prefix": None, "length": 15}, + "gemini": {"key_name": "GEMINI_API_KEY", "prefix": None, "length": 15}, + "default": {"key_name": "HUGGINGFACE_API_KEY", "prefix": "hf_"} + } + + for model, api_key_info in model_api_keys.items(): + if model in self.INTERPRETER_MODEL or model == "default": + api_key_name = api_key_info["key_name"] + api_key = os.getenv(api_key_name) + if api_key is None: + load_dotenv(dotenv_path=os.path.join(os.getcwd(), ".env")) + api_key = os.getenv(api_key_name) + if not api_key: + raise Exception(f"{api_key_name} not found in .env file.") + if api_key_info["prefix"] and not api_key.startswith(api_key_info["prefix"]): + raise Exception(f"{api_key_name} should start with '{api_key_info['prefix']}'. Please check your .env file.") + if api_key_info.get("length") and len(api_key) <= api_key_info["length"]: + raise Exception(f"{api_key_name} should have length greater than {api_key_info['length']}. Please check your .env file.") + break + + def initialize_mode(self): + self.CODE_MODE = True if self.args.mode == 'code' else False + self.SCRIPT_MODE = True if self.args.mode == 'script' else False + self.COMMAND_MODE = True if self.args.mode == 'command' else False + self.VISION_MODE = True if self.args.mode == 'vision' else False + self.CHAT_MODE = True if self.args.mode == 'chat' else False + if not self.SCRIPT_MODE and not self.COMMAND_MODE and not self.VISION_MODE and not self.CHAT_MODE: + self.CODE_MODE = True + + def get_prompt(self,message: str, chat_history: List[dict]) -> str: + system_message = None + + if self.CODE_MODE: + system_message = self.system_message + elif self.SCRIPT_MODE: + system_message = "Please generate a well-written script that is precise, easy to understand, and compatible with the current operating system." + elif self.COMMAND_MODE: + system_message = "Please generate a single line command that is precise, easy to understand, and compatible with the current operating system." + elif self.VISION_MODE: + system_message = "Please generate a well-written description of the image that is precise, easy to understand" + return system_message + elif self.CHAT_MODE: + system_message = "Please generate a well-written response that is precise, easy to understand" + + # Add the chat history to the prompt + if chat_history or len(chat_history) > 0: + system_message += "\n\n" + "\n\n" + "This is user chat history for this task and make sure to use this as reference to generate the answer if user asks for 'History' or 'Chat History'.\n\n" + "\n\n" + str(chat_history) + "\n\n" + + # Use the Messages API from Anthropic. + if 'claude-3' in self.INTERPRETER_MODEL: + messages=[ + { + "role": "user", + "content": [ + { + "type": "text", + "text": message + } + ] + } + ] + + # Use the Assistants API. + else: + messages = [ + {"role": "system", "content": system_message}, + {"role": "assistant", "content": "Please generate code wrapped inside triple backticks known as codeblock."}, + {"role": "user", "content": message} + ] + + return messages + + def execute_last_code(self,os_name): + try: + code_file,code_snippet = self.utility_manager.get_code_history(self.INTERPRETER_LANGUAGE) + + # check if the code is empty + if code_snippet is None: + self.logger.error("Code history is empty.") + print("Code history is empty. - Please use -s or --save_code to save the code.") + return + + display_code(code_snippet) + # Execute the code if the user has selected. + code_output, code_error = self.execute_code(code_snippet, os_name) + if code_output: + self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed successfully.") + display_code(code_output) + self.logger.info(f"Output: {code_output[:100]}") + elif code_error: + self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed with error.") + display_markdown_message(f"Error: {code_error}") + except Exception as exception: + self.logger.error(f"Error in processing command run code: {str(exception)}") + raise + + def generate_content(self,message, chat_history: list[tuple[str, str]], temperature=0.1, max_tokens=1024,config_values=None,image_file=None): + self.logger.info(f"Generating content with args: message={message}, chat_history={chat_history}, temperature={temperature}, max_tokens={max_tokens}, config_values={config_values}, image_file={image_file}") + self.logger.info(f"Interpreter model selected is '{self.INTERPRETER_MODEL}'") + + # Use the values from the config file if they are provided + if config_values: + temperature = float(config_values.get('temperature', temperature)) + max_tokens = int(config_values.get('max_tokens', max_tokens)) + api_base = str(config_values.get('api_base', None)) # Only for OpenAI. + + # Get the system prompt + messages = self.get_prompt(message, chat_history) + + # Check if the model is GPT 3.5/4/4o + if 'gpt' in self.INTERPRETER_MODEL: + self.logger.info("Model is GPT 3.5/4/4o") + if api_base != 'None': + # Set the custom language model provider + custom_llm_provider = "openai" + self.logger.info(f"Custom API mode selected for OpenAI, api_base={api_base}") + response = litellm.completion(self.INTERPRETER_MODEL, messages=messages, temperature=temperature, max_tokens=max_tokens, api_base=api_base, custom_llm_provider=custom_llm_provider) + else: + self.logger.info(f"Default API mode selected for OpenAI.") + response = litellm.completion(self.INTERPRETER_MODEL, messages=messages, temperature=temperature, max_tokens=max_tokens) + self.logger.info("Response received from completion function.") + + # Check if the model is PALM-2 + elif 'palm' in self.INTERPRETER_MODEL: + self.logger.info("Model is PALM-2.") + self.INTERPRETER_MODEL = "palm/chat-bison" + response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature,max_tokens=max_tokens) + self.logger.info("Response received from completion function.") + + # Check if the model is Gemini Pro + elif 'gemini' in self.INTERPRETER_MODEL: + + if self.INTERPRETER_MODE == 'vision': + # Import Gemini Vision only if the model is Gemini Pro Vision. + try: + from libs.gemini_vision import GeminiVision + self.gemini_vision = GeminiVision() + except Exception as exception: + self.logger.error(f"Error importing Gemini Vision: {exception}") + raise + + self.logger.info("Model is Gemini Pro Vision.") + response = None + + # Check if image_file is valid. + if not image_file: + self.logger.error("Image file is not valid or Corrupted.") + raise ValueError("Image file is not valid or Corrupted.") + + # Check if image contains URL. + if 'http' in image_file or 'https' in image_file or 'www.' in image_file: + self.logger.info("Image contains URL.") + response = self.gemini_vision.gemini_vision_url(prompt=messages,image_url=image_file) + else: + self.logger.info("Image contains file.") + response = self.gemini_vision.gemini_vision_path(prompt=messages,image_path=image_file) + + self.logger.info("Response received from completion function.") + return response # Return the response from Gemini Vision because its not coding model. + else: + self.logger.info("Model is Gemini Pro.") + self.INTERPRETER_MODEL = "gemini/gemini-pro" + response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature) + self.logger.info("Response received from completion function.") + + # Check if the model is Groq-AI + elif 'groq' in self.INTERPRETER_MODEL: + + if 'groq-llama2' in self.INTERPRETER_MODEL: + self.logger.info("Model is Groq/Llama2.") + self.INTERPRETER_MODEL = "groq/llama2-70b-4096" + elif 'groq-mixtral' in self.INTERPRETER_MODEL: + self.logger.info("Model is Groq/Mixtral.") + self.INTERPRETER_MODEL = "groq/mixtral-8x7b-32768" + elif 'groq-gemma' in self.INTERPRETER_MODEL: + self.logger.info("Model is Groq/Gemma.") + self.INTERPRETER_MODEL = "groq/gemma-7b-it" + + response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature,max_tokens=max_tokens) + self.logger.info("Response received from completion function.") + + # Check if the model is AnthropicAI + elif 'claude' in self.INTERPRETER_MODEL: + + if 'claude-2' in self.INTERPRETER_MODEL: + self.logger.info("Model is Claude-2.") + self.INTERPRETER_MODEL = "claude-2" + elif 'claude-2.1' in self.INTERPRETER_MODEL: + self.logger.info("Model is claude-2.1.") + self.INTERPRETER_MODEL = "claude-2.1" + + # Support for Claude-3 Models + elif 'claude-3' in self.INTERPRETER_MODEL: + + if 'claude-3-sonnet' in self.INTERPRETER_MODEL: + self.logger.info("Model is claude-3-sonnet.") + self.INTERPRETER_MODEL = "claude-3-sonnet-20240229" + + elif 'claude-3-opus' in self.INTERPRETER_MODEL: + self.logger.info("Model is claude-3-opus.") + self.INTERPRETER_MODEL = "claude-3-opus-20240229" + + response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature,max_tokens=max_tokens) + self.logger.info("Response received from completion function.") + + # Check if the model is Local Model + elif 'local' in self.INTERPRETER_MODEL: + self.logger.info("Model is Local model") + if api_base != 'None': + # Set the custom language model provider + custom_llm_provider = "openai" + self.logger.info(f"Custom API mode selected for Local Model, api_base={api_base}") + response = litellm.completion(self.INTERPRETER_MODEL, messages=messages, temperature=temperature, max_tokens=max_tokens, api_base=api_base, custom_llm_provider=custom_llm_provider) + else: + raise Exception("Exception api base not set for custom model") + self.logger.info("Response received from completion function.") + + + # Check if model are from Hugging Face. + else: + # Add huggingface/ if not present in the model name. + if 'huggingface/' not in self.INTERPRETER_MODEL: + self.INTERPRETER_MODEL = 'huggingface/' + self.INTERPRETER_MODEL + + self.logger.info(f"Model is from Hugging Face. {self.INTERPRETER_MODEL}") + response = litellm.completion(self.INTERPRETER_MODEL, messages=messages,temperature=temperature,max_tokens=max_tokens) + self.logger.info("Response received from completion function.") + + self.logger.info(f"Generated text {response}") + generated_text = self.utility_manager._extract_content(response) + self.logger.info(f"Generated content {generated_text}") + return generated_text + + def get_code_prompt(self, task, os_name): + + if self.INTERPRETER_LANGUAGE not in ['python', 'javascript']: + self.INTERPRETER_LANGUAGE = 'python' + + prompt = ( + f"Generate the {self.INTERPRETER_LANGUAGE} code for the following task: '{task}'.\n" + f"Ensure the code is well-structured and there should be no comments or no extra text, easy to read, and follows best practices for {self.INTERPRETER_LANGUAGE}.\n" + f"The code should be compatible with the operating system: {os_name}, considering any platform-specific features or limitations.\n" + "Handle potential errors gracefully, and ensure the solution is efficient and concise.\n" + "If there are multiple possible solutions, choose the most optimized one." + ) + return prompt + + def get_script_prompt(self, task, os_name): + os_name_lower = os_name.lower() + + # Combined dictionary for both language mapping and script type + language_map = { + 'darwin': ('applescript', 'AppleScript'), + 'linux': ('bash', 'Bash Shell script'), + 'windows': ('powershell', 'Powershell script') + } + + # Find matching language and script type or default to Python + self.INTERPRETER_LANGUAGE, script_type = next( + (lang, stype) for key, (lang, stype) in language_map.items() if key in os_name_lower + ) if any(key in os_name_lower for key in language_map) else ('python', 'script') + + prompt = ( + f"Generate only the {script_type} for this task:\n" + f"Task: '{task}'\n" + f"Operating System: {os_name}\n" + "NOTE: Ensure the script is compatible with the specified OS and version.\n" + "Output should only contain the script, with no additional text." + ) + + self.logger.info(f"Script Prompt: {prompt}") + return prompt + + def get_command_prompt(self, task, os_name): + prompt = ( + f"Generate only the single terminal command for this task:\n" + f"Task: '{task}'\n" + f"Operating System: {os_name}\n" + "NOTE: Ensure the command is compatible with the specified OS and version.\n" + "Output should only contain the command, with no additional text." + ) + self.logger.info(f"Command Prompt: {prompt}") + return prompt + + def handle_vision_mode(self, task): + prompt = f"Give accurate and detailed information about the image provided and be very detailed about the image '{task}'." + return prompt + + def handle_chat_mode(self, task): + prompt = f"Give accurate and detailed response to the question provided and be very detailed about the question '{task}'." + return prompt + + def get_mode_prompt(self, task, os_name): + if self.CODE_MODE: + self.logger.info("Getting code prompt.") + return self.get_code_prompt(task, os_name) + elif self.SCRIPT_MODE: + self.logger.info("Getting script prompt.") + return self.get_script_prompt(task, os_name) + elif self.COMMAND_MODE: + self.logger.info("Getting command prompt.") + return self.get_command_prompt(task, os_name) + elif self.VISION_MODE: + self.logger.info("Getting vision prompt.") + return self.handle_vision_mode(task) + elif self.CHAT_MODE: + self.logger.info("Getting chat prompt.") + return self.handle_chat_mode(task) + + def execute_code(self, extracted_code, os_name): + # If the interpreter mode is Vision, do not execute the code. + if self.INTERPRETER_MODE in ['vision','chat']: + return None, None + + execute = 'y' if self.EXECUTE_CODE else input("Execute the code? (Y/N): ") + if execute.lower() == 'y': + try: + code_output, code_error = "", "" + if self.SCRIPT_MODE: + code_output, code_error = self.code_interpreter.execute_script(extracted_code, os_type=os_name) + elif self.COMMAND_MODE: + code_output, code_error = self.code_interpreter.execute_command(extracted_code) + elif self.CODE_MODE: + code_output, code_error = self.code_interpreter.execute_code(extracted_code, language=self.INTERPRETER_LANGUAGE) + return code_output, code_error + except Exception as exception: + self.logger.error(f"Error occurred while executing code: {str(exception)}") + return None, str(exception) # Return error message as second element of tuple + else: + return None, None # Return None, None if user chooses not to execute the code + + def interpreter_main(self,version): + + self.interpreter_version = version + self.logger.info(f"Interpreter - v{self.interpreter_version}") + + os_platform = self.utility_manager.get_os_platform() + os_name = os_platform[0] + generated_output = None + code_snippet = None + code_output, code_error = None, None + extracted_file_name = None + + # Seting the mode. + if self.SCRIPT_MODE: + self.INTERPRETER_MODE = 'script' + elif self.COMMAND_MODE: + self.INTERPRETER_MODE = 'command' + elif self.VISION_MODE: + self.INTERPRETER_MODE = 'vision' + elif self.CHAT_MODE: + self.INTERPRETER_MODE = 'chat' + + start_sep = str(self.config_values.get('start_sep', '```')) + end_sep = str(self.config_values.get('end_sep', '```')) + skip_first_line = self.config_values.get('skip_first_line', 'False') == 'True' + + self.logger.info(f"Mode: {self.INTERPRETER_MODE} Start separator: {start_sep}, End separator: {end_sep}, Skip first line: {skip_first_line}") + + # Display system and Assistant information. + input_prompt_mode = "File" if self.INTERPRETER_PROMPT_FILE else "Input" + display_code(f"OS: '{os_name}', Language: '{self.INTERPRETER_LANGUAGE}', Mode: '{self.INTERPRETER_MODE}', Prompt: '{input_prompt_mode}', Model: '{self.INTERPRETER_MODEL}'") + + # Display the welcome message. + display_markdown_message("Welcome to the **Interpreter**, I'm here to **assist** you with your everyday tasks. " + "\nEnter your task and I'll do my best to help you out.") + + # Main System and Assistant loop. + running = True + while running: + try: + task = None + + if self.INTERPRETER_PROMPT_INPUT: + self.logger.info(f"Reading prompt from input.") + # Main input prompt - System and Assistant. + task = input("> ") + elif self.INTERPRETER_PROMPT_FILE: + prompt_file_name = self.args.file + + # Setting the prompt file path. + if not prompt_file_name: + prompt_file_name = 'prompt.txt' + + prompt_file_path = os.path.join(os.getcwd(),'system',prompt_file_name) + + # check if the file exists. + if not os.path.exists(prompt_file_path): + self.logger.error(f"Prompt file not found: {prompt_file_path}") + user_confirmation = input(f"Create a new prompt file (Y/N)?: ") + if user_confirmation.lower() == 'y': + self.logger.info(f"Creating new prompt file.") + self.utility_manager.create_file(prompt_file_path) + display_markdown_message(f"New prompt file created **successfully** ") + else: + self.logger.info(f"User declined to create new prompt file.") + display_markdown_message(f"User declined to create new prompt file.\nSwitching to input mode.") + # Switch to input mode. + self.INTERPRETER_PROMPT_INPUT = True + self.INTERPRETER_PROMPT_FILE = False + + continue + continue + + display_markdown_message(f"\nEnter your task in the file **'{prompt_file_path}'**") + + # File mode command section. + prompt_confirmation = input(f"Execute the prompt (Y/N/P/C) (P = Prompt Mode,C = Command Mode)?: ") + if prompt_confirmation.lower() == 'y': + self.logger.info(f"Executing prompt from file.") - self.logger.info(f"Executing prompt from file {prompt_file_path}") - task = self.utility_manager.read_file(prompt_file_path) - elif prompt_confirmation.lower() == 'n': - self.logger.info(f"Waiting for user confirmation to execute prompt from file.") - print("Waiting for user confirmation to execute prompt from file.") - self.utility_manager.clear_screen() - continue - elif prompt_confirmation.lower() == 'p': - self.INTERPRETER_PROMPT_INPUT = True - self.INTERPRETER_PROMPT_FILE = False - self.logger.info(f"Changing input mode to prompt from file.") - self.utility_manager.clear_screen() - continue - elif prompt_confirmation.lower() == 'c': - self.logger.info(f"Changing input mode to command from file.") - task = input("> ") - else: - # Invalid input mode (0x000022) - self.logger.error("Invalid input mode.") - self.utility_manager.clear_screen() - continue - - # EXIT - Command section. - if task.lower() == '/exit': - break - - # HELP - Command section. - elif task.lower() == '/help': - self.utility_manager.display_help() - continue - - # CLEAR - Command section. - elif task.lower() == '/clear': - self.utility_manager.clear_screen() - continue - - # VERSION - Command section. - elif task.lower() == '/version': - self.utility_manager.display_version(self.interpreter_version) - continue - - # PROMPT - Command section. - elif task.lower() == '/prompt': - if self.INTERPRETER_PROMPT_INPUT: - self.INTERPRETER_PROMPT_INPUT = False - self.INTERPRETER_PROMPT_FILE = True - self.logger.info(f"Input mode changed to File.") - else: - self.INTERPRETER_PROMPT_INPUT = True - self.INTERPRETER_PROMPT_FILE = False - self.logger.info(f"Input mode changed to Prompt.") - continue - - # HISTORY - Command section. - elif task.lower() == '/history': - self.INTERPRETER_HISTORY = not self.INTERPRETER_HISTORY - display_markdown_message(f"History is {'enabled' if self.INTERPRETER_HISTORY else 'disabled'}") - continue - - # SHELL - Command section. - elif any(command in task.lower() for command in ['/shell ']): - shell_command = shlex.split(task)[1:] - shell_command = ' '.join(shell_command) - shell_output, shell_error = self.code_interpreter.execute_command(shell_command) - if shell_output: - self.logger.info(f"Shell command executed successfully.") - display_code(shell_output) - self.logger.info(f"Output: {shell_output[:100]}") - elif shell_error: - self.logger.info(f"Shell command executed with error.") - display_markdown_message(f"Error: {shell_error}") - continue - - # LOG - Command section. - elif task.lower() == '/log': - # Toggle the log level to Verbose/Silent. - - logger_mode = Logger.get_current_level() - logger_mode = logger_mode.lower() - - if logger_mode == 'debug': - Logger.set_silent_mode() - display_markdown_message(f"Logger mode changed to **Silent**.") - else: - Logger.set_verbose_mode() - display_markdown_message(f"Logger mode changed to **Verbose**.") - continue - - # LIST - Command section. - elif task.lower() == '/list': - # Get the models info - - # Reading all the config files in the configs folder. - configs_path = os.path.join(os.getcwd(), 'configs') - configs_files = [file for file in os.listdir(configs_path) if file.endswith('.config')] - - # Removing all extensions from the list. - configs_files = [os.path.splitext(file)[0] for file in configs_files] - - # Printing the models info. - print('Available models:\n') - for index, model in enumerate(configs_files, 1): - print(f'{index}. {model}') - print('', end='\n') - - # Print all the available modes. - print('Available modes:\n') - for index, mode in enumerate(['code','script','command','vision','chat'], 1): - print(f'{index}. {mode}',end='\n') - - # Print all the available languages. - print('\nAvailable languages:\n') - for index, language in enumerate(['python','javascript'], 1): - print(f'{index}. {language}') - - continue - - # UPGRAGE - Command section. - elif task.lower() == '/upgrade': - self.utility_manager.upgrade_interpreter() - continue - - # EXECUTE - Command section. - elif task.lower() == '/execute': - self.execute_last_code(os_name) - continue - - # SAVE - Command section. - elif task.lower() == '/save': - latest_code_extension = 'py' if self.INTERPRETER_LANGUAGE == 'python' else 'js' - latest_code_name = f"output/code_{time.strftime('%Y_%m_%d-%H_%M_%S', time.localtime())}." + latest_code_extension - latest_code = code_snippet - self.code_interpreter.save_code(latest_code_name, latest_code) - display_markdown_message(f"Code saved successfully to {latest_code_name}.") - continue - - # EDIT - Command section. - elif task.lower() == '/edit': - code_file,code_snippet = self.utility_manager.get_code_history(self.INTERPRETER_LANGUAGE) - - # Get the OS platform. - os_platform = self.utility_manager.get_os_platform() - - # Check if user wants to open in vim? - vim_open = input("Open the code in vim editor (Y/N):") - if vim_open.lower() == 'y': - self.logger.info(f"Opening code in **vim** editor {code_file.name if not isinstance(code_file, str) else code_file}") - subprocess.call(['vim', code_file.name if not isinstance(code_file, str) else code_file]) - continue - else: - # Open the code in default editor. - if os_platform[0].lower() == 'macos': - self.logger.info(f"Opening code in default editor {code_file.name if not isinstance(code_file, str) else code_file}") - subprocess.call(('open', code_file.name if not isinstance(code_file, str) else code_file)) - elif os_platform[0].lower() == 'linux': - subprocess.call(('xdg-open', code_file.name if not isinstance(code_file, str) else code_file)) - elif os_platform[0].lower() == 'windows': - os.startfile(code_file.name if not isinstance(code_file, str) else code_file) - continue - - # DEBUG - Command section. - elif task.lower() == '/debug': - - if not code_error: - code_error = code_output - - if not code_error: - display_markdown_message(f"Error: No error found in the code to fix.") - continue - - debug_prompt = f"Fix the errors in {self.INTERPRETER_LANGUAGE} language.\nCode is \n'{code_snippet}'\nAnd Error is \n'{code_error}'\n give me output only in code and no other text or explanation. And comment in code where you fixed the error.\n" - - # Start the LLM Request. - self.logger.info(f"Debug Prompt: {debug_prompt}") - generated_output = self.generate_content(debug_prompt, self.history, config_values=self.config_values,image_file=extracted_file_name) - - # Extract the code from the generated output. - self.logger.info(f"Generated output type {type(generated_output)}") - code_snippet = self.code_interpreter.extract_code(generated_output, start_sep, end_sep, skip_first_line,self.CODE_MODE) - - # Display the extracted code. - self.logger.info(f"Extracted code: {code_snippet[:50]}") - - if self.DISPLAY_CODE: - display_code(code_snippet) - self.logger.info("Code extracted successfully.") - - # Execute the code if the user has selected. - code_output, code_error = self.execute_code(code_snippet, os_name) - - if code_output: - self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed successfully.") - display_code(code_output) - self.logger.info(f"Output: {code_output[:100]}") - elif code_error: - self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed with error.") - display_markdown_message(f"Error: {code_error}") - continue - - # MODE - Command section. - elif any(command in task.lower() for command in ['/mode ']): - mode = task.split(' ')[1] - if mode: - if not mode.lower() in ['code','script','command','vision','chat']: - mode = 'code' - display_markdown_message(f"The input mode is not supported. Mode changed to {mode}," - "\nUse '/list' command to get the list of supported modes.") - else: - modes = {'vision': 'VISION_MODE', 'script': 'SCRIPT_MODE', 'command': 'COMMAND_MODE', 'code': 'CODE_MODE', 'chat': 'CHAT_MODE'} - - self.INTERPRETER_MODE = mode.lower() - - for key in modes: - if self.INTERPRETER_MODE == key: - setattr(self, modes[key], True) - else: - setattr(self, modes[key], False) - display_markdown_message(f"Mode changed to '{self.INTERPRETER_MODE}'") - continue - - # MODEL - Command section. - elif any(command in task.lower() for command in ['/model ']): - model = task.split(' ')[1] - if model: - model_config_file = f"configs/{model}.config" - if not os.path.isfile(model_config_file): - display_markdown_message(f"Model {model} does not exists. Please check the model name using '/list' command.") - continue - else: - self.INTERPRETER_MODEL = model - display_markdown_message(f"Model changed to '{self.INTERPRETER_MODEL}'") - self.initialize_client() # Reinitialize the client with new model. - continue - - # LANGUAGE - Command section. - elif any(command in task.lower() for command in ['/language','/lang']): - split_task = task.split(' ') - if len(split_task) > 1: - language = split_task[1] - if language: - self.INTERPRETER_LANGUAGE = language - if language not in ['python', 'javascript']: - self.INTERPRETER_LANGUAGE = 'python' - display_markdown_message(f"The input language is not supported. Language changed to {self.INTERPRETER_LANGUAGE}") - display_markdown_message(f"Language changed to '{self.INTERPRETER_LANGUAGE}'") - continue - - # INSTALL - Command section. - elif any(command in task.lower() for command in ['/install']): - # get the package name after the command - package_name = task.split(' ')[1] - - # check if package name is not system module. - system_modules = self.package_manager.get_system_modules() - - # Skip installing system modules. - if package_name in system_modules: - self.logger.info(f"Package {package_name} is a system module.") - display_markdown_message(f"Package {package_name} is a system module.") - raise Exception(f"Package {package_name} is a system module.") - - if package_name: - self.logger.info(f"Installing package {package_name} on interpreter {self.INTERPRETER_LANGUAGE}") - self.package_manager.install_package(package_name, self.INTERPRETER_LANGUAGE) - continue - - # Get the prompt based on the mode. - else: - prompt = self.get_mode_prompt(task, os_name) - self.logger.info(f"Prompt init is '{prompt}'") - - # Check if the prompt is empty. - if not prompt: - display_markdown_message("Please **enter** a valid task.") - continue - - # Clean the responses - self.utility_manager._clean_responses() - - # Print Model and Mode information. - self.logger.info(f"Interpreter Mode: {self.INTERPRETER_MODE} Model: {self.INTERPRETER_MODEL}") - - # Check if prompt contains any file uploaded by user. - extracted_file_name = self.utility_manager.extract_file_name(prompt) - self.logger.info(f"Input prompt file name: '{extracted_file_name}'") - - if extracted_file_name is not None: - full_path = self.utility_manager.get_full_file_path(extracted_file_name) - self.logger.info(f"Input prompt full_path: '{full_path}'") - - # Check if image contains URL. - if 'http' in extracted_file_name or 'https' in extracted_file_name or 'www.' in extracted_file_name: - self.logger.info("Image contains URL Skipping the file processing.") - - else: - # Check if the file exists and is a file - if os.path.isfile(full_path): - # Check if file size is less than 50 KB - file_size_max = 50000 - file_size = os.path.getsize(full_path) - self.logger.info(f"Input prompt file_size: '{file_size}'") - if file_size < file_size_max: - try: - with open(full_path, 'r', encoding='utf-8') as file: - # Check if file extension is .json, .csv, or .xml - file_extension = os.path.splitext(full_path)[1].lower() - - if file_extension in ['.json','.xml']: - # Split by new line and read only 20 lines - file_data = '\n'.join(file.readline() for _ in range(20)) - self.logger.info(f"Input prompt JSON/XML file_data: '{str(file_data)}'") - - elif file_extension == '.csv': - # Read only headers of the csv file - file_data = self.utility_manager.read_csv_headers(full_path) - self.logger.info(f"Input prompt CSV file_data: '{str(file_data)}'") - - else: - file_data = file.read() - self.logger.info(f"Input prompt file_data: '{str(file_data)}'") - - if any(word in prompt.lower() for word in ['graph', 'graphs', 'chart', 'charts']): - prompt += "\n" + "This is file data from user input: " + str(file_data) + " use this to analyze the data." - self.logger.info(f"Input Prompt: '{prompt}'") - else: - self.logger.info("The prompt does not contain both 'graph' and 'chart'.") - except Exception as exception: - self.logger.error(f"Error reading file: {exception}") - else: - self.logger.warning("File size is greater.") - else: - self.logger.error("File does not exist or is not a file.") - else: - self.logger.info("No file name found in the prompt.") - - # If graph were requested. - if any(word in prompt.lower() for word in ['graph', 'graphs']): - if self.INTERPRETER_LANGUAGE == 'python': - prompt += "\n" + "using Python use Matplotlib save the graph in file called 'graph.png'" - elif self.INTERPRETER_LANGUAGE == 'javascript': - prompt += "\n" + "using JavaScript use Chart.js save the graph in file called 'graph.png'" - - # if Chart were requested - if any(word in prompt.lower() for word in ['chart', 'charts', 'plot', 'plots']): - if self.INTERPRETER_LANGUAGE == 'python': - prompt += "\n" + "using Python use Plotly save the chart in file called 'chart.png'" - elif self.INTERPRETER_LANGUAGE == 'javascript': - prompt += "\n" + "using JavaScript use Chart.js save the chart in file called 'chart.png'" - - # if Table were requested - if 'table' in prompt.lower(): - if self.INTERPRETER_LANGUAGE == 'python': - prompt += "\n" + "using Python use Pandas save the table in file called 'table.md'" - elif self.INTERPRETER_LANGUAGE == 'javascript': - prompt += "\n" + "using JavaScript use DataTables save the table in file called 'table.html'" - - # Start the LLM Request. - self.logger.info(f"Prompt: {prompt}") - - # Add the history as memory. - if self.INTERPRETER_HISTORY and self.INTERPRETER_MODE == 'chat': - self.history = self.history_manager.get_chat_history(self.history_count) - - elif self.INTERPRETER_HISTORY and self.INTERPRETER_MODE == 'code': - self.history = self.history_manager.get_code_history(self.history_count) - - generated_output = self.generate_content(prompt, self.history, config_values=self.config_values,image_file=extracted_file_name) - - # No extra processing for Vision mode. - if self.INTERPRETER_MODE in ['vision','chat']: - display_markdown_message(f"{generated_output}") - continue - - # Extract the code from the generated output. - self.logger.info(f"Generated output type {type(generated_output)}") - code_snippet = self.code_interpreter.extract_code(generated_output, start_sep, end_sep, skip_first_line,self.CODE_MODE) - - # Display the extracted code. - self.logger.info(f"Extracted code: {code_snippet[:50]}") - - if self.DISPLAY_CODE: - display_code(code_snippet) - self.logger.info("Code extracted successfully.") - - if code_snippet: - current_time = time.strftime("%Y_%m_%d-%H_%M_%S", time.localtime()) - - if self.INTERPRETER_LANGUAGE == 'javascript' and self.SAVE_CODE and self.CODE_MODE: - self.code_interpreter.save_code(f"output/code_{current_time}.js", code_snippet) - self.logger.info(f"JavaScript code saved successfully.") - - elif self.INTERPRETER_LANGUAGE == 'python' and self.SAVE_CODE and self.CODE_MODE: - self.code_interpreter.save_code(f"output/code_{current_time}.py", code_snippet) - self.logger.info(f"{self.INTERPRETER_LANGUAGE} code saved successfully.") - - elif self.SAVE_CODE and self.COMMAND_MODE: - self.code_interpreter.save_code(f"output/command_{current_time}.txt", code_snippet) - self.logger.info(f"Command saved successfully.") + self.logger.info(f"Executing prompt from file {prompt_file_path}") + task = self.utility_manager.read_file(prompt_file_path) + elif prompt_confirmation.lower() == 'n': + self.logger.info(f"Waiting for user confirmation to execute prompt from file.") + print("Waiting for user confirmation to execute prompt from file.") + self.utility_manager.clear_screen() + continue + elif prompt_confirmation.lower() == 'p': + self.INTERPRETER_PROMPT_INPUT = True + self.INTERPRETER_PROMPT_FILE = False + self.logger.info(f"Changing input mode to prompt from file.") + self.utility_manager.clear_screen() + continue + elif prompt_confirmation.lower() == 'c': + self.logger.info(f"Changing input mode to command from file.") + task = input("> ") + else: + # Invalid input mode (0x000022) + self.logger.error("Invalid input mode.") + self.utility_manager.clear_screen() + continue + + # EXIT - Command section. + if task.lower() == '/exit': + break + + # HELP - Command section. + elif task.lower() == '/help': + self.utility_manager.display_help() + continue + + # CLEAR - Command section. + elif task.lower() == '/clear': + self.utility_manager.clear_screen() + continue + + # VERSION - Command section. + elif task.lower() == '/version': + self.utility_manager.display_version(self.interpreter_version) + continue + + # PROMPT - Command section. + elif task.lower() == '/prompt': + if self.INTERPRETER_PROMPT_INPUT: + self.INTERPRETER_PROMPT_INPUT = False + self.INTERPRETER_PROMPT_FILE = True + self.logger.info(f"Input mode changed to File.") + else: + self.INTERPRETER_PROMPT_INPUT = True + self.INTERPRETER_PROMPT_FILE = False + self.logger.info(f"Input mode changed to Prompt.") + continue + + # HISTORY - Command section. + elif task.lower() == '/history': + self.INTERPRETER_HISTORY = not self.INTERPRETER_HISTORY + display_markdown_message(f"History is {'enabled' if self.INTERPRETER_HISTORY else 'disabled'}") + continue + + # SHELL - Command section. + elif any(command in task.lower() for command in ['/shell ']): + shell_command = shlex.split(task)[1:] + shell_command = ' '.join(shell_command) + shell_output, shell_error = self.code_interpreter.execute_command(shell_command) + if shell_output: + self.logger.info(f"Shell command executed successfully.") + display_code(shell_output) + self.logger.info(f"Output: {shell_output[:100]}") + elif shell_error: + self.logger.info(f"Shell command executed with error.") + display_markdown_message(f"Error: {shell_error}") + continue + + # LOG - Command section. + elif task.lower() == '/log': + # Toggle the log level to Verbose/Silent. + + logger_mode = Logger.get_current_level() + logger_mode = logger_mode.lower() + + if logger_mode == 'debug': + Logger.set_silent_mode() + display_markdown_message(f"Logger mode changed to **Silent**.") + else: + Logger.set_verbose_mode() + display_markdown_message(f"Logger mode changed to **Verbose**.") + continue + + # LIST - Command section. + elif task.lower() == '/list': + # Get the models info + + # Reading all the config files in the configs folder. + configs_path = os.path.join(os.getcwd(), 'configs') + configs_files = [file for file in os.listdir(configs_path) if file.endswith('.config')] + + # Removing all extensions from the list. + configs_files = [os.path.splitext(file)[0] for file in configs_files] + + # Printing the models info. + print('Available models:\n') + for index, model in enumerate(configs_files, 1): + print(f'{index}. {model}') + print('', end='\n') + + # Print all the available modes. + print('Available modes:\n') + for index, mode in enumerate(['code','script','command','vision','chat'], 1): + print(f'{index}. {mode}',end='\n') + + # Print all the available languages. + print('\nAvailable languages:\n') + for index, language in enumerate(['python','javascript'], 1): + print(f'{index}. {language}') + + continue + + # UPGRAGE - Command section. + elif task.lower() == '/upgrade': + self.utility_manager.upgrade_interpreter() + continue + + # EXECUTE - Command section. + elif task.lower() == '/execute': + self.execute_last_code(os_name) + continue + + # SAVE - Command section. + elif task.lower() == '/save': + latest_code_extension = 'py' if self.INTERPRETER_LANGUAGE == 'python' else 'js' + latest_code_name = f"output/code_{time.strftime('%Y_%m_%d-%H_%M_%S', time.localtime())}." + latest_code_extension + latest_code = code_snippet + self.code_interpreter.save_code(latest_code_name, latest_code) + display_markdown_message(f"Code saved successfully to {latest_code_name}.") + continue + + # EDIT - Command section. + elif task.lower() == '/edit': + code_file, code_snippet = self.utility_manager.get_code_history(self.INTERPRETER_LANGUAGE) + + # Get the OS platform. + os_platform = self.utility_manager.get_os_platform() + + # Check if user wants to open in vim? + vim_open = input("Open the code in vim editor (Y/N):") + if vim_open.lower() == 'y': + self.logger.info(f"Opening code in **vim** editor {code_file.name if not isinstance(code_file, str) else code_file}") + subprocess.call(['vim', code_file.name if not isinstance(code_file, str) else code_file]) + continue + else: + # Open the code in default editor. + if os_platform[0].lower() == 'macos': + self.logger.info(f"Opening code in default editor {code_file.name if not isinstance(code_file, str) else code_file}") + subprocess.call(('open', code_file.name if not isinstance(code_file, str) else code_file)) + elif os_platform[0].lower() == 'linux': + subprocess.call(('xdg-open', code_file.name if not isinstance(code_file, str) else code_file)) + elif os_platform[0].lower() == 'windows': + os.startfile(code_file.name if not isinstance(code_file, str) else code_file) + continue + + # DEBUG - Command section. + elif task.lower() == '/debug': + + if not code_error: + code_error = code_output + + if not code_error: + display_markdown_message(f"Error: No error found in the code to fix.") + continue + + debug_prompt = f"Fix the errors in {self.INTERPRETER_LANGUAGE} language.\nCode is \n'{code_snippet}'\nAnd Error is \n'{code_error}'\n" + f"give me output only in code and no other text or explanation. And comment in code where you fixed the error.\n" + + # Start the LLM Request. + self.logger.info(f"Debug Prompt: {debug_prompt}") + generated_output = self.generate_content(debug_prompt, self.history, config_values=self.config_values, image_file=extracted_file_name) + + # Extract the code from the generated output. + self.logger.info(f"Generated output type {type(generated_output)}") + code_snippet = self.code_interpreter.extract_code(generated_output, start_sep, end_sep, skip_first_line, self.CODE_MODE) + + # Display the extracted code. + self.logger.info(f"Extracted code: {code_snippet[:50]}") + + if self.DISPLAY_CODE: + display_code(code_snippet) + self.logger.info("Code extracted successfully.") + + # Execute the code if the user has selected. + code_output, code_error = self.execute_code(code_snippet, os_name) + + if code_output: + self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed successfully.") + display_code(code_output) + self.logger.info(f"Output: {code_output[:100]}") + elif code_error: + self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed with error.") + display_markdown_message(f"Error: {code_error}") + continue + + # MODE - Command section. + elif any(command in task.lower() for command in ['/mode ']): + mode = task.split(' ')[1] + if mode: + if not mode.lower() in ['code','script','command','vision','chat']: + mode = 'code' + display_markdown_message(f"The input mode is not supported. Mode changed to {mode}," + "\nUse '/list' command to get the list of supported modes.") + else: + modes = {'vision': 'VISION_MODE', 'script': 'SCRIPT_MODE', 'command': 'COMMAND_MODE', 'code': 'CODE_MODE', 'chat': 'CHAT_MODE'} + + self.INTERPRETER_MODE = mode.lower() + + for key in modes: + if self.INTERPRETER_MODE == key: + setattr(self, modes[key], True) + else: + setattr(self, modes[key], False) + display_markdown_message(f"Mode changed to '{self.INTERPRETER_MODE}'") + continue + + # MODEL - Command section. + elif any(command in task.lower() for command in ['/model ']): + model = task.split(' ')[1] + if model: + model_config_file = f"configs/{model}.config" + if not os.path.isfile(model_config_file): + display_markdown_message(f"Model {model} does not exists. Please check the model name using '/list' command.") + continue + else: + self.INTERPRETER_MODEL = model + display_markdown_message(f"Model changed to '{self.INTERPRETER_MODEL}'") + self.initialize_client() # Reinitialize the client with new model. + continue + + # LANGUAGE - Command section. + elif any(command in task.lower() for command in ['/language','/lang']): + split_task = task.split(' ') + if len(split_task) > 1: + language = split_task[1] + if language: + self.INTERPRETER_LANGUAGE = language + if language not in ['python', 'javascript']: + self.INTERPRETER_LANGUAGE = 'python' + display_markdown_message(f"The input language is not supported. Language changed to {self.INTERPRETER_LANGUAGE}") + display_markdown_message(f"Language changed to '{self.INTERPRETER_LANGUAGE}'") + continue + + # INSTALL - Command section. + elif any(command in task.lower() for command in ['/install']): + # get the package name after the command + package_name = task.split(' ')[1] + + # check if package name is not system module. + system_modules = self.package_manager.get_system_modules() + + # Skip installing system modules. + if package_name in system_modules: + self.logger.info(f"Package {package_name} is a system module.") + display_markdown_message(f"Package {package_name} is a system module.") + raise Exception(f"Package {package_name} is a system module.") + + if package_name: + self.logger.info(f"Installing package {package_name} on interpreter {self.INTERPRETER_LANGUAGE}") + self.package_manager.install_package(package_name, self.INTERPRETER_LANGUAGE) + continue + + # Get the prompt based on the mode. + else: + prompt = self.get_mode_prompt(task, os_name) + self.logger.info(f"Prompt init is '{prompt}'") + + # Check if the prompt is empty. + if not prompt: + display_markdown_message("Please **enter** a valid task.") + continue + + # Clean the responses + self.utility_manager._clean_responses() + + # Print Model and Mode information. + self.logger.info(f"Interpreter Mode: {self.INTERPRETER_MODE} Model: {self.INTERPRETER_MODEL}") + + # Check if prompt contains any file uploaded by user. + extracted_file_name = self.utility_manager.extract_file_name(prompt) + self.logger.info(f"Input prompt file name: '{extracted_file_name}'") + + if extracted_file_name is not None: + full_path = self.utility_manager.get_full_file_path(extracted_file_name) + self.logger.info(f"Input prompt full_path: '{full_path}'") + + # Check if image contains URL. + if 'http' in extracted_file_name or 'https' in extracted_file_name or 'www.' in extracted_file_name: + self.logger.info("Image contains URL Skipping the file processing.") + + else: + # Check if the file exists and is a file + if os.path.isfile(full_path): + # Check if file size is less than 50 KB + file_size_max = 50000 + file_size = os.path.getsize(full_path) + self.logger.info(f"Input prompt file_size: '{file_size}'") + if file_size < file_size_max: + try: + with open(full_path, 'r', encoding='utf-8') as file: + # Check if file extension is .json, .csv, or .xml + file_extension = os.path.splitext(full_path)[1].lower() + + if file_extension in ['.json','.xml']: + # Split by new line and read only 20 lines + file_data = '\n'.join(file.readline() for _ in range(20)) + self.logger.info(f"Input prompt JSON/XML file_data: '{str(file_data)}'") + + elif file_extension == '.csv': + # Read only headers of the csv file + file_data = self.utility_manager.read_csv_headers(full_path) + self.logger.info(f"Input prompt CSV file_data: '{str(file_data)}'") + + else: + file_data = file.read() + self.logger.info(f"Input prompt file_data: '{str(file_data)}'") + + if any(word in prompt.lower() for word in ['graph', 'graphs', 'chart', 'charts']): + prompt += "\n" + "This is file data from user input: " + str(file_data) + " use this to analyze the data." + self.logger.info(f"Input Prompt: '{prompt}'") + else: + self.logger.info("The prompt does not contain both 'graph' and 'chart'.") + except Exception as exception: + self.logger.error(f"Error reading file: {exception}") + else: + self.logger.warning("File size is greater.") + else: + self.logger.error("File does not exist or is not a file.") + else: + self.logger.info("No file name found in the prompt.") + + # If graph were requested. + if any(word in prompt.lower() for word in ['graph', 'graphs']): + if self.INTERPRETER_LANGUAGE == 'python': + prompt += "\n" + "using Python use Matplotlib save the graph in file called 'graph.png'" + elif self.INTERPRETER_LANGUAGE == 'javascript': + prompt += "\n" + "using JavaScript use Chart.js save the graph in file called 'graph.png'" + + # if Chart were requested + if any(word in prompt.lower() for word in ['chart', 'charts', 'plot', 'plots']): + if self.INTERPRETER_LANGUAGE == 'python': + prompt += "\n" + "using Python use Plotly save the chart in file called 'chart.png'" + elif self.INTERPRETER_LANGUAGE == 'javascript': + prompt += "\n" + "using JavaScript use Chart.js save the chart in file called 'chart.png'" + + # if Table were requested + if 'table' in prompt.lower(): + if self.INTERPRETER_LANGUAGE == 'python': + prompt += "\n" + "using Python use Pandas save the table in file called 'table.md'" + elif self.INTERPRETER_LANGUAGE == 'javascript': + prompt += "\n" + "using JavaScript use DataTables save the table in file called 'table.html'" + + # Start the LLM Request. + self.logger.info(f"Prompt: {prompt}") + + # Add the history as memory. + if self.INTERPRETER_HISTORY and self.INTERPRETER_MODE == 'chat': + self.history = self.history_manager.get_chat_history(self.history_count) + + elif self.INTERPRETER_HISTORY and self.INTERPRETER_MODE == 'code': + self.history = self.history_manager.get_code_history(self.history_count) + + generated_output = self.generate_content(prompt, self.history, config_values=self.config_values,image_file=extracted_file_name) + + # No extra processing for Vision mode. + if self.INTERPRETER_MODE in ['vision','chat']: + display_markdown_message(f"{generated_output}") + continue + + # Extract the code from the generated output. + self.logger.info(f"Generated output type {type(generated_output)}") + code_snippet = self.code_interpreter.extract_code(generated_output, start_sep, end_sep, skip_first_line,self.CODE_MODE) + + # Display the extracted code. + self.logger.info(f"Extracted code: {code_snippet[:50]}") + + if self.DISPLAY_CODE: + display_code(code_snippet) + self.logger.info("Code extracted successfully.") + + if code_snippet: + current_time = time.strftime("%Y_%m_%d-%H_%M_%S", time.localtime()) + + if self.INTERPRETER_LANGUAGE == 'javascript' and self.SAVE_CODE and self.CODE_MODE: + self.code_interpreter.save_code(f"output/code_{current_time}.js", code_snippet) + self.logger.info(f"JavaScript code saved successfully.") + + elif self.INTERPRETER_LANGUAGE == 'python' and self.SAVE_CODE and self.CODE_MODE: + self.code_interpreter.save_code(f"output/code_{current_time}.py", code_snippet) + self.logger.info(f"{self.INTERPRETER_LANGUAGE} code saved successfully.") + + elif self.SAVE_CODE and self.COMMAND_MODE: + self.code_interpreter.save_code(f"output/command_{current_time}.txt", code_snippet) + self.logger.info(f"Command saved successfully.") - elif self.SAVE_CODE and self.SCRIPT_MODE: - self.code_interpreter.save_code(f"output/script_{current_time}.txt", code_snippet) - self.logger.info(f"Script saved successfully.") - - # Execute the code if the user has selected. - code_output, code_error = self.execute_code(code_snippet, os_name) - - if code_output: - self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed successfully.") - display_code(code_output) - self.logger.info(f"Output: {code_output[:100]}") - elif code_error: - self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed with error.") - display_markdown_message(f"Error: {code_error}") - - # install Package on error. - error_messages = ["ModuleNotFound", "ImportError", "No module named", "Cannot find module"] - if code_error is not None and any(error_message in code_error for error_message in error_messages): - package_name = self.package_manager.extract_package_name(code_error, self.INTERPRETER_LANGUAGE) - - # check if package name is not system module. - system_modules = self.package_manager.get_system_modules() - - # Skip installing system modules. - if package_name in system_modules: - self.logger.info(f"Package {package_name} is a system module.") - display_markdown_message(f"Package {package_name} is a system module.") - raise Exception(f"Package {package_name} is a system module.") - - if package_name: - self.logger.info(f"Installing package {package_name} on interpreter {self.INTERPRETER_LANGUAGE}") - self.package_manager.install_package(package_name, self.INTERPRETER_LANGUAGE) - - # Wait and Execute the code again. - time.sleep(3) - code_output, code_error = self.execute_code(code_snippet, os_name) - if code_output: - self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed successfully.") - display_code(code_output) - self.logger.info(f"Output: {code_output[:100]}") - elif code_error: - self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed with error.") - display_markdown_message(f"Error: {code_error}") - - try: - # Check if graph.png exists and open it. - self.utility_manager._open_resource_file('graph.png') - - # Check if chart.png exists and open it. - self.utility_manager._open_resource_file('chart.png') - - # Check if table.md exists and open it. - self.utility_manager._open_resource_file('table.md') - except Exception as exception: - display_markdown_message(f"Error in opening resource files: {str(exception)}") - - self.history_manager.save_history_json(task, self.INTERPRETER_MODE, os_name, self.INTERPRETER_LANGUAGE, prompt, code_snippet,code_output, self.INTERPRETER_MODEL) - - except Exception as exception: - self.logger.error(f"An error occurred: {str(exception)}") - raise \ No newline at end of file + elif self.SAVE_CODE and self.SCRIPT_MODE: + self.code_interpreter.save_code(f"output/script_{current_time}.txt", code_snippet) + self.logger.info(f"Script saved successfully.") + + # Execute the code if the user has selected. + code_output, code_error = self.execute_code(code_snippet, os_name) + + if code_output: + self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed successfully.") + display_code(code_output) + self.logger.info(f"Output: {code_output[:100]}") + elif code_error: + self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed with error.") + display_markdown_message(f"Error: {code_error}") + + # install Package on error. + error_messages = ["ModuleNotFound", "ImportError", "No module named", "Cannot find module"] + if code_error is not None and any(error_message in code_error for error_message in error_messages): + package_name = self.package_manager.extract_package_name(code_error, self.INTERPRETER_LANGUAGE) + + # check if package name is not system module. + system_modules = self.package_manager.get_system_modules() + + # Skip installing system modules. + if package_name in system_modules: + self.logger.info(f"Package {package_name} is a system module.") + display_markdown_message(f"Package {package_name} is a system module.") + raise Exception(f"Package {package_name} is a system module.") + + if package_name: + self.logger.info(f"Installing package {package_name} on interpreter {self.INTERPRETER_LANGUAGE}") + self.package_manager.install_package(package_name, self.INTERPRETER_LANGUAGE) + + # Wait and Execute the code again. + time.sleep(3) + code_output, code_error = self.execute_code(code_snippet, os_name) + if code_output: + self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed successfully.") + display_code(code_output) + self.logger.info(f"Output: {code_output[:100]}") + elif code_error: + self.logger.info(f"{self.INTERPRETER_LANGUAGE} code executed with error.") + display_markdown_message(f"Error: {code_error}") + + try: + # Check if graph.png exists and open it. + self.utility_manager._open_resource_file('graph.png') + + # Check if chart.png exists and open it. + self.utility_manager._open_resource_file('chart.png') + + # Check if table.md exists and open it. + self.utility_manager._open_resource_file('table.md') + except Exception as exception: + display_markdown_message(f"Error in opening resource files: {str(exception)}") + + self.history_manager.save_history_json(task, self.INTERPRETER_MODE, os_name, self.INTERPRETER_LANGUAGE, prompt, code_snippet,code_output, self.INTERPRETER_MODEL) + + except Exception as exception: + self.logger.error(f"An error occurred: {str(exception)}") + raise \ No newline at end of file diff --git a/libs/logger.py b/libs/logger.py index 0628cb0..9e9f46f 100644 --- a/libs/logger.py +++ b/libs/logger.py @@ -2,67 +2,67 @@ from logging.handlers import RotatingFileHandler class Logger: - _logger = None + _logger = None - @staticmethod - def initialize_logger(filename=None, verbose=False): - if Logger._logger is None: - # Define the logging format - log_format = ("%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d] [%(funcName)s] - %(message)s") + @staticmethod + def initialize_logger(filename=None, verbose=False): + if Logger._logger is None: + # Define the logging format + log_format = ("%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d] [%(funcName)s] - %(message)s") - # Create a rotating file handler that will manage log file sizes and backups - file_handler = RotatingFileHandler(filename, maxBytes=5*1024*1024) # 5MB per file - file_handler.setFormatter(logging.Formatter(log_format)) + # Create a rotating file handler that will manage log file sizes and backups + file_handler = RotatingFileHandler(filename, maxBytes=5*1024*1024) # 5MB per file + file_handler.setFormatter(logging.Formatter(log_format)) - # add logger for console - console_handler = logging.StreamHandler() - console_handler.setFormatter(logging.Formatter(log_format)) + # add logger for console + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter(log_format)) - # Create the logger - Logger._logger = logging.getLogger() - Logger._logger.setLevel(logging.INFO) + # Create the logger + Logger._logger = logging.getLogger() + Logger._logger.setLevel(logging.INFO) - # Check if the logger already has handlers, if not, add the file handler - if not Logger._logger.handlers: - Logger._logger.addHandler(file_handler) - Logger._logger.addHandler(console_handler) + # Check if the logger already has handlers, if not, add the file handler + if not Logger._logger.handlers: + Logger._logger.addHandler(file_handler) + Logger._logger.addHandler(console_handler) - # Set the logger to verbose or silent mode - if verbose: - Logger.set_verbose_mode() - else: - Logger.set_silent_mode() + # Set the logger to verbose or silent mode + if verbose: + Logger.set_verbose_mode() + else: + Logger.set_silent_mode() - return Logger._logger + return Logger._logger - @staticmethod - def set_level_to_debug(): - Logger._logger.setLevel(logging.DEBUG) + @staticmethod + def set_level_to_debug(): + Logger._logger.setLevel(logging.DEBUG) - @staticmethod - def set_level_to_info(): - Logger._logger.setLevel(logging.INFO) + @staticmethod + def set_level_to_info(): + Logger._logger.setLevel(logging.INFO) - @staticmethod - def set_level_to_warning(): - Logger._logger.setLevel(logging.WARNING) + @staticmethod + def set_level_to_warning(): + Logger._logger.setLevel(logging.WARNING) - @staticmethod - def set_level_to_error(): - Logger._logger.setLevel(logging.ERROR) + @staticmethod + def set_level_to_error(): + Logger._logger.setLevel(logging.ERROR) - @staticmethod - def set_level_to_critical(): - Logger._logger.setLevel(logging.CRITICAL) - - @staticmethod - def set_verbose_mode(): - Logger._logger.setLevel(logging.DEBUG) + @staticmethod + def set_level_to_critical(): + Logger._logger.setLevel(logging.CRITICAL) + + @staticmethod + def set_verbose_mode(): + Logger._logger.setLevel(logging.DEBUG) - @staticmethod - def set_silent_mode(): - Logger._logger.setLevel(logging.ERROR) - - @staticmethod - def get_current_level(): - return logging.getLevelName(Logger._logger.level) \ No newline at end of file + @staticmethod + def set_silent_mode(): + Logger._logger.setLevel(logging.ERROR) + + @staticmethod + def get_current_level(): + return logging.getLevelName(Logger._logger.level) \ No newline at end of file diff --git a/libs/markdown_code.py b/libs/markdown_code.py index fad60bf..2963962 100644 --- a/libs/markdown_code.py +++ b/libs/markdown_code.py @@ -6,71 +6,71 @@ from pygments.formatters import TerminalFormatter def display_markdown_message(message): - """ - Display markdown message. Works with multiline strings with lots of indentation. - Will automatically make single line > tags beautiful. - """ + """ + Display markdown message. Works with multiline strings with lots of indentation. + Will automatically make single line > tags beautiful. + """ - for line in message.split("\n"): - line = line.strip() - if line == "": - print("") - elif line == "---": - rich_print(Rule(style="white")) - else: - rich_print(Markdown(line)) + for line in message.split("\n"): + line = line.strip() + if line == "": + print("") + elif line == "---": + rich_print(Rule(style="white")) + else: + rich_print(Markdown(line)) - if "\n" not in message and message.startswith(">"): - # Aesthetic choice. For these tags, they need a space below them - print("") - + if "\n" not in message and message.startswith(">"): + # Aesthetic choice. For these tags, they need a space below them + print("") + def display_code(code: list, language: str = "python"): - try: - syntax = Syntax(code, language, theme="monokai", line_numbers=True) - rich_print(syntax,end="",flush=True) - except Exception as exception: - print(f"An error occurred: {exception}") - + try: + syntax = Syntax(code, language, theme="monokai", line_numbers=True) + rich_print(syntax,end="",flush=True) + except Exception as exception: + print(f"An error occurred: {exception}") + class CustomFormatter(TerminalFormatter): - def format(self, tokensource, outfile): - # call the parent method - super().format(tokensource, outfile) - # remove the trailing newline from the output file - outfile.seek(outfile.tell() - 1) - if outfile.read() == '\n': - outfile.truncate() + def format(self, tokensource, outfile): + # call the parent method + super().format(tokensource, outfile) + # remove the trailing newline from the output file + outfile.seek(outfile.tell() - 1) + if outfile.read() == '\n': + outfile.truncate() from rich.console import Console def display_code_stream(stream): - """ - This function prints each token in the stream as a Python syntax highlighted code block. - :param stream: The stream of text to be printed. - """ - try: - code = "" - console = Console(record=True) # Create a Console object that records print calls + """ + This function prints each token in the stream as a Python syntax highlighted code block. + :param stream: The stream of text to be printed. + """ + try: + code = "" + console = Console(record=True) # Create a Console object that records print calls - for output in stream: - output_code = output.token.text - output_code = output_code.replace("```","") if output_code else output_code - output_code = output_code.replace("","") if output_code else output_code - if output_code == '\n': - highlighted_code = Syntax(code, "python", theme="monokai", line_numbers=False,word_wrap=True) - console.print(highlighted_code) # Print to the console object - code = "" - else: - code += output_code + "" - time.sleep(0.03) + for output in stream: + output_code = output.token.text + output_code = output_code.replace("```","") if output_code else output_code + output_code = output_code.replace("","") if output_code else output_code + if output_code == '\n': + highlighted_code = Syntax(code, "python", theme="monokai", line_numbers=False,word_wrap=True) + console.print(highlighted_code) # Print to the console object + code = "" + else: + code += output_code + "" + time.sleep(0.03) - # Check if there is any remaining code that hasn't been printed - if code: - highlighted_code = Syntax(code, "python", theme="monokai", line_numbers=False,word_wrap=True) - console.print(highlighted_code) # Print to the console object - code = "" - - return console.export_text().strip() # Return the recorded text - except Exception as exception: - print(f"Error while printing as markdown: {exception}") \ No newline at end of file + # Check if there is any remaining code that hasn't been printed + if code: + highlighted_code = Syntax(code, "python", theme="monokai", line_numbers=False,word_wrap=True) + console.print(highlighted_code) # Print to the console object + code = "" + + return console.export_text().strip() # Return the recorded text + except Exception as exception: + print(f"Error while printing as markdown: {exception}") \ No newline at end of file diff --git a/libs/package_manager.py b/libs/package_manager.py index 8ba4034..49ed119 100644 --- a/libs/package_manager.py +++ b/libs/package_manager.py @@ -5,197 +5,197 @@ import stdlib_list class PackageManager: - def __init__(self): - self.pip_command = "pip" - self.pip3_command = "pip3" - self.npm_command = "npm" - self.logger = logging.getLogger(__name__) - handler = logging.StreamHandler() - formatter = logging.Formatter('%(asctime)s [%(levelname)s] [%(name)s:%(lineno)d] [%(funcName)s] - %(message)s') - handler.setFormatter(formatter) - self.logger.addHandler(handler) - self.logger.setLevel(logging.INFO) + def __init__(self): + self.pip_command = "pip" + self.pip3_command = "pip3" + self.npm_command = "npm" + self.logger = logging.getLogger(__name__) + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s [%(levelname)s] [%(name)s:%(lineno)d] [%(funcName)s] - %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) - def install_package(self, package_name, language): - if language == "python": - if not self._check_package_exists_pip(package_name): - exception = ValueError(f"Package {package_name} does not exist") - self.logger.error(exception) - raise exception - - if not self._is_package_installed(package_name,"pip"): - try: - # Try to install the package using pip - self._install_package_with_pip(package_name) - except subprocess.CalledProcessError: - try: - # If pip fails, try to install the package using pip3 - self._install_package_with_pip3(package_name) - except subprocess.CalledProcessError as exception: - self.logger.error(f"Failed to install package with both pip and pip3: {package_name}") - raise exception - else: - self.logger.info(f"Package {package_name} is already installed") - elif language == "javascript": - try: - if not self._check_package_exists_npm(package_name): - exception = ValueError(f"Package {package_name} does not exist") - self.logger.error(exception) - raise exception - - if not self._is_package_installed(package_name,"npm"): - try: - # Try to install the package using npm - self._install_package_with_npm(package_name) - except subprocess.CalledProcessError as exception: - raise exception - - except subprocess.CalledProcessError as exception: - self.logger.error(f"Failed to install package with npm: {package_name}") - raise exception - else: - exception = ValueError("Invalid language selected.") - self.logger.error(exception) - raise exception - - def extract_package_name(self,error,language): - if language == "python": - return self._extract_python_package_name(error) - elif language == "javascript": - return self._extract_javascript_package_name(error) - else: - exception = ValueError("Invalid language selected.") - self.logger.error(exception) - raise exception + def install_package(self, package_name, language): + if language == "python": + if not self._check_package_exists_pip(package_name): + exception = ValueError(f"Package {package_name} does not exist") + self.logger.error(exception) + raise exception + + if not self._is_package_installed(package_name,"pip"): + try: + # Try to install the package using pip + self._install_package_with_pip(package_name) + except subprocess.CalledProcessError: + try: + # If pip fails, try to install the package using pip3 + self._install_package_with_pip3(package_name) + except subprocess.CalledProcessError as exception: + self.logger.error(f"Failed to install package with both pip and pip3: {package_name}") + raise exception + else: + self.logger.info(f"Package {package_name} is already installed") + elif language == "javascript": + try: + if not self._check_package_exists_npm(package_name): + exception = ValueError(f"Package {package_name} does not exist") + self.logger.error(exception) + raise exception + + if not self._is_package_installed(package_name,"npm"): + try: + # Try to install the package using npm + self._install_package_with_npm(package_name) + except subprocess.CalledProcessError as exception: + raise exception + + except subprocess.CalledProcessError as exception: + self.logger.error(f"Failed to install package with npm: {package_name}") + raise exception + else: + exception = ValueError("Invalid language selected.") + self.logger.error(exception) + raise exception + + def extract_package_name(self,error,language): + if language == "python": + return self._extract_python_package_name(error) + elif language == "javascript": + return self._extract_javascript_package_name(error) + else: + exception = ValueError("Invalid language selected.") + self.logger.error(exception) + raise exception - def get_system_modules(self): - try: - # Get a list of all module names in the standard library - stdlib = stdlib_list.stdlib_list() - return stdlib - except Exception as exception: - raise ValueError("An error occurred while getting module names") from exception - - def _install_package_with_pip(self, package_name): - try: - subprocess.check_call([self.pip_command, "install", package_name]) - self.logger.info(f"Successfully installed package with pip: {package_name}") - except subprocess.CalledProcessError as exception: - self.logger.error(f"Failed to install package with pip: {package_name}") - raise exception + def get_system_modules(self): + try: + # Get a list of all module names in the standard library + stdlib = stdlib_list.stdlib_list() + return stdlib + except Exception as exception: + raise ValueError("An error occurred while getting module names") from exception + + def _install_package_with_pip(self, package_name): + try: + subprocess.check_call([self.pip_command, "install", package_name]) + self.logger.info(f"Successfully installed package with pip: {package_name}") + except subprocess.CalledProcessError as exception: + self.logger.error(f"Failed to install package with pip: {package_name}") + raise exception - def _install_package_with_pip3(self, package_name): - try: - subprocess.check_call([self.pip3_command, "install", package_name]) - self.logger.info(f"Successfully installed package with pip3: {package_name}") - except subprocess.CalledProcessError as exception: - self.logger.error(f"Failed to install package with pip3: {package_name}") - raise exception - - def _install_package_with_npm(self, package_name): - try: - subprocess.check_call([self.npm_command, "install", package_name]) - self.logger.info(f"Successfully installed package with npm: {package_name}") - except subprocess.CalledProcessError as exception: - self.logger.error(f"Failed to install package with npm: {package_name}") - raise exception - - def _is_package_installed(self, package_name, package_manager): - if package_manager == "pip": - try: - subprocess.check_call([self.pip_command, "show", package_name]) - self.logger.info(f"Package {package_name} is installed") - return True - except subprocess.CalledProcessError: - try: - subprocess.check_call([self.pip3_command, "show", package_name]) - self.logger.info(f"Package {package_name} is installed") - return True - except subprocess.CalledProcessError: - self.logger.info(f"Package {package_name} is not installed") - return False - elif package_manager == "npm": - try: - subprocess.check_call([self.npm_command, "list", "-g", package_name]) - self.logger.info(f"Package {package_name} is installed") - return True - except subprocess.CalledProcessError: - self.logger.info(f"Package {package_name} is not installed") - return False - else: - exception = ValueError("Invalid package manager selected.") - self.logger.error(exception) - raise exception - - def _install_package_with_npm(self, package_name): - try: - subprocess.check_call([self.npm_command, "install", package_name]) - self.logger.info(f"Successfully installed package with npm: {package_name}") - except subprocess.CalledProcessError as exception: - self.logger.error(f"Failed to install package with npm: {package_name}") - raise exception - - - def _extract_python_package_name(self, error_message): - # Regular expression pattern to match the error message - pattern = r"ModuleNotFoundError: No module named '(\w+)'|ModuleNotFoundError: '(\w+)'" - match = re.search(pattern, error_message) - if match: - # Extract the package name from the error message - package_name = match.group(1) if match.group(1) else match.group(2) - return package_name - else: - # If the package name could not be extracted, log an error and raise an exception - exception = ValueError("Could not extract package name from error message") - self.logger.error(exception) - raise exception - - def _extract_javascript_package_name(self,error_message): - try: - lines = error_message.split('\n') - for line in lines: - if line.startswith("Error: Cannot find module"): - package_name = line.split("'")[1] - return package_name - return None - except Exception as exception: - self.logger.error(f"Failed to extract package name from error message: {exception}") - raise exception - - def _check_package_exists_pip(self,package_name): - import requests - from bs4 import BeautifulSoup - - try: - # Send a GET request to the PyPI search page - response = requests.get(f"https://pypi.org/search/?q={package_name}") - response.raise_for_status() - - # Parse the HTML content of the page - soup = BeautifulSoup(response.text, 'html.parser') - - # Search for the package name in the parsed HTML - search_results = soup.find_all('span', class_='package-snippet__name') - for result in search_results: - if result.text.strip() == package_name: - return True - - # If the package name was not found in the search results, log an error and raise an exception - exception = ValueError(f"Package {package_name} does not exist on PyPI") - self.logger.error(exception) - raise exception - except requests.exceptions.RequestException as exception: - raise exception - - def _check_package_exists_npm(self, package_name): - try: - response = requests.get(f"https://www.npmjs.com/package/{package_name}") - if response.status_code == 200: - self.logger.info(f"Package {package_name} exists on npm website") - return True - else: - self.logger.info(f"Package {package_name} does not exist on npm website") - return False - except requests.exceptions.RequestException as exception: - self.logger.error(f"Failed to check package existence on npm website: {exception}") - raise exception + def _install_package_with_pip3(self, package_name): + try: + subprocess.check_call([self.pip3_command, "install", package_name]) + self.logger.info(f"Successfully installed package with pip3: {package_name}") + except subprocess.CalledProcessError as exception: + self.logger.error(f"Failed to install package with pip3: {package_name}") + raise exception + + def _install_package_with_npm(self, package_name): + try: + subprocess.check_call([self.npm_command, "install", package_name]) + self.logger.info(f"Successfully installed package with npm: {package_name}") + except subprocess.CalledProcessError as exception: + self.logger.error(f"Failed to install package with npm: {package_name}") + raise exception + + def _is_package_installed(self, package_name, package_manager): + if package_manager == "pip": + try: + subprocess.check_call([self.pip_command, "show", package_name]) + self.logger.info(f"Package {package_name} is installed") + return True + except subprocess.CalledProcessError: + try: + subprocess.check_call([self.pip3_command, "show", package_name]) + self.logger.info(f"Package {package_name} is installed") + return True + except subprocess.CalledProcessError: + self.logger.info(f"Package {package_name} is not installed") + return False + elif package_manager == "npm": + try: + subprocess.check_call([self.npm_command, "list", "-g", package_name]) + self.logger.info(f"Package {package_name} is installed") + return True + except subprocess.CalledProcessError: + self.logger.info(f"Package {package_name} is not installed") + return False + else: + exception = ValueError("Invalid package manager selected.") + self.logger.error(exception) + raise exception + + def _install_package_with_npm(self, package_name): + try: + subprocess.check_call([self.npm_command, "install", package_name]) + self.logger.info(f"Successfully installed package with npm: {package_name}") + except subprocess.CalledProcessError as exception: + self.logger.error(f"Failed to install package with npm: {package_name}") + raise exception + + + def _extract_python_package_name(self, error_message): + # Regular expression pattern to match the error message + pattern = r"ModuleNotFoundError: No module named '(\w+)'|ModuleNotFoundError: '(\w+)'" + match = re.search(pattern, error_message) + if match: + # Extract the package name from the error message + package_name = match.group(1) if match.group(1) else match.group(2) + return package_name + else: + # If the package name could not be extracted, log an error and raise an exception + exception = ValueError("Could not extract package name from error message") + self.logger.error(exception) + raise exception + + def _extract_javascript_package_name(self,error_message): + try: + lines = error_message.split('\n') + for line in lines: + if line.startswith("Error: Cannot find module"): + package_name = line.split("'")[1] + return package_name + return None + except Exception as exception: + self.logger.error(f"Failed to extract package name from error message: {exception}") + raise exception + + def _check_package_exists_pip(self,package_name): + import requests + from bs4 import BeautifulSoup + + try: + # Send a GET request to the PyPI search page + response = requests.get(f"https://pypi.org/search/?q={package_name}") + response.raise_for_status() + + # Parse the HTML content of the page + soup = BeautifulSoup(response.text, 'html.parser') + + # Search for the package name in the parsed HTML + search_results = soup.find_all('span', class_='package-snippet__name') + for result in search_results: + if result.text.strip() == package_name: + return True + + # If the package name was not found in the search results, log an error and raise an exception + exception = ValueError(f"Package {package_name} does not exist on PyPI") + self.logger.error(exception) + raise exception + except requests.exceptions.RequestException as exception: + raise exception + + def _check_package_exists_npm(self, package_name): + try: + response = requests.get(f"https://www.npmjs.com/package/{package_name}") + if response.status_code == 200: + self.logger.info(f"Package {package_name} exists on npm website") + return True + else: + self.logger.info(f"Package {package_name} does not exist on npm website") + return False + except requests.exceptions.RequestException as exception: + self.logger.error(f"Failed to check package existence on npm website: {exception}") + raise exception diff --git a/libs/utility_manager.py b/libs/utility_manager.py index 801779f..da92fe6 100644 --- a/libs/utility_manager.py +++ b/libs/utility_manager.py @@ -12,292 +12,292 @@ class UtilityManager: - logger = None + logger = None - def __init__(self): - try: - if not os.path.exists('logs'): - os.makedirs('logs') - if not os.path.isfile('logs/interpreter.log'): - open('logs/interpreter.log', 'w').close() - except Exception as exception: - self.logger.error(f"Error in UtilityManager initialization: {str(exception)}") - raise - self.logger = Logger.initialize_logger("logs/interpreter.log") + def __init__(self): + try: + if not os.path.exists('logs'): + os.makedirs('logs') + if not os.path.isfile('logs/interpreter.log'): + open('logs/interpreter.log', 'w').close() + except Exception as exception: + self.logger.error(f"Error in UtilityManager initialization: {str(exception)}") + raise + self.logger = Logger.initialize_logger("logs/interpreter.log") - def _open_resource_file(self, filename): - try: - if os.path.isfile(filename): - if platform.system() == "Windows": - subprocess.call(['start', filename], shell=True) - elif platform.system() == "Darwin": - subprocess.call(['open', filename]) - elif platform.system() == "Linux": - subprocess.call(['xdg-open', filename]) - self.logger.info(f"{filename} exists and opened successfully") - except Exception as exception: - display_markdown_message(f"Error in opening files: {str(exception)}") + def _open_resource_file(self, filename): + try: + if os.path.isfile(filename): + if platform.system() == "Windows": + subprocess.call(['start', filename], shell=True) + elif platform.system() == "Darwin": + subprocess.call(['open', filename]) + elif platform.system() == "Linux": + subprocess.call(['xdg-open', filename]) + self.logger.info(f"{filename} exists and opened successfully") + except Exception as exception: + display_markdown_message(f"Error in opening files: {str(exception)}") - def _clean_responses(self): - files_to_remove = ['graph.png', 'chart.png', 'table.md'] - for file in files_to_remove: - try: - if os.path.isfile(file): - os.remove(file) - self.logger.info(f"{file} removed successfully") - except Exception as e: - print(f"Error in removing {file}: {str(e)}") - - def _extract_content(self, output): - try: - return output['choices'][0]['message']['content'] - except (KeyError, TypeError) as e: - self.logger.error(f"Error extracting content: {str(e)}") - raise - - def get_os_platform(self): - try: - os_info = platform.uname() - os_name = os_info.system - os_version = os_info.release + def _clean_responses(self): + files_to_remove = ['graph.png', 'chart.png', 'table.md'] + for file in files_to_remove: + try: + if os.path.isfile(file): + os.remove(file) + self.logger.info(f"{file} removed successfully") + except Exception as e: + print(f"Error in removing {file}: {str(e)}") + + def _extract_content(self, output): + try: + return output['choices'][0]['message']['content'] + except (KeyError, TypeError) as e: + self.logger.error(f"Error extracting content: {str(e)}") + raise + + def get_os_platform(self): + try: + os_info = platform.uname() + os_name = os_info.system + os_version = os_info.release - if os_name == 'Linux': - # Attempt to get distribution info - try: - import distro - distro_info = distro.info() - os_name = f"{os_name} ({distro_info['id']} {distro_info['version_parts']['major']})" # e.g., "Linux (ubuntu 22)" - except ImportError: - self.logger.warning("distro package not found. Linux distribution details will be less specific.") - # Fallback if distro is not installed - os_name = f"{os_name} ({os_version})" - elif os_name == 'Windows': - os_name = f"{os_name} {platform.version()}" - elif os_name == 'Darwin': # macOS - os_name = f"{os_name} {platform.mac_ver()[0]}" + if os_name == 'Linux': + # Attempt to get distribution info + try: + import distro + distro_info = distro.info() + os_name = f"{os_name} ({distro_info['id']} {distro_info['version_parts']['major']})" # e.g., "Linux (ubuntu 22)" + except ImportError: + self.logger.warning("distro package not found. Linux distribution details will be less specific.") + # Fallback if distro is not installed + os_name = f"{os_name} ({os_version})" + elif os_name == 'Windows': + os_name = f"{os_name} {platform.version()}" + elif os_name == 'Darwin': # macOS + os_name = f"{os_name} {platform.mac_ver()[0]}" - self.logger.info(f"Operating System: {os_name}") - return os_name, os_info.version - except Exception as exception: - self.logger.error(f"Error in getting OS platform: {str(exception)}") - raise + self.logger.info(f"Operating System: {os_name}") + return os_name, os_info.version + except Exception as exception: + self.logger.error(f"Error in getting OS platform: {str(exception)}") + raise - def initialize_readline_history(self): - try: - # Checking the OS type - # If it's posix (Unix-like), import readline for handling lines from input - # If it's not posix, import pyreadline as readline - if os.name == 'posix': - import readline - else: - import pyreadline as readline - - histfile = os.path.join(os.path.expanduser("~"), ".python_history") - - # Check if histfile exists before trying to read it - if os.path.exists(histfile): - readline.read_history_file(histfile) - - # Save history to file on exit - import atexit - atexit.register(readline.write_history_file, histfile) - - except FileNotFoundError: - raise Exception("History file not found") - - except AttributeError: - # Handle error on Windows where pyreadline doesn't have read_history_file - self.logger.info("pyreadline doesn't have read_history_file") - raise Exception("On Windows, pyreadline doesn't have read_history_file") - except Exception as exception: - self.logger.error(f"Error in initializing readline history: {str(exception)}") - raise + def initialize_readline_history(self): + try: + # Checking the OS type + # If it's posix (Unix-like), import readline for handling lines from input + # If it's not posix, import pyreadline as readline + if os.name == 'posix': + import readline + else: + import pyreadline as readline + + histfile = os.path.join(os.path.expanduser("~"), ".python_history") + + # Check if histfile exists before trying to read it + if os.path.exists(histfile): + readline.read_history_file(histfile) + + # Save history to file on exit + import atexit + atexit.register(readline.write_history_file, histfile) + + except FileNotFoundError: + raise Exception("History file not found") + + except AttributeError: + # Handle error on Windows where pyreadline doesn't have read_history_file + self.logger.info("pyreadline doesn't have read_history_file") + raise Exception("On Windows, pyreadline doesn't have read_history_file") + except Exception as exception: + self.logger.error(f"Error in initializing readline history: {str(exception)}") + raise - def read_config_file(self, filename=".config"): - try: - config_data = {} - with open(filename, "r") as config_file: - for line in config_file: - # Ignore comments and lines without an equals sign - if line.strip().startswith('#') or '=' not in line: - continue - key, value = line.strip().split("=") - config_data[key.strip()] = value.strip() - return config_data - except Exception as exception: - self.logger.error(f"Error in reading config file: {str(exception)}") - raise + def read_config_file(self, filename=".config"): + try: + config_data = {} + with open(filename, "r") as config_file: + for line in config_file: + # Ignore comments and lines without an equals sign + if line.strip().startswith('#') or '=' not in line: + continue + key, value = line.strip().split("=") + config_data[key.strip()] = value.strip() + return config_data + except Exception as exception: + self.logger.error(f"Error in reading config file: {str(exception)}") + raise - def extract_file_name(self, prompt): - try: - # This pattern looks for typical file paths, names, and URLs, then stops at the end of the extension - pattern = r"((?:[a-zA-Z]:\\(?:[\w\-\.]+\\)*|/(?:[\w\-\.]+/)*|\b[\w\-\.]+\b|https?://[\w\-\.]+/[\w\-\.]+/)*[\w\-\.]+\.\w+)" - match = re.search(pattern, prompt) + def extract_file_name(self, prompt): + try: + # This pattern looks for typical file paths, names, and URLs, then stops at the end of the extension + pattern = r"((?:[a-zA-Z]:\\(?:[\w\-\.]+\\)*|/(?:[\w\-\.]+/)*|\b[\w\-\.]+\b|https?://[\w\-\.]+/[\w\-\.]+/)*[\w\-\.]+\.\w+)" + match = re.search(pattern, prompt) - # Return the matched file name or path, if any match found - if match: - file_name = match.group() - file_extension = os.path.splitext(file_name)[1].lower() - self.logger.info(f"File extension: '{file_extension}'") - # Check if the file extension is one of the non-binary types - if file_extension in ['.json', '.csv', '.xml', '.xls', '.txt','.md','.html','.png','.jpg','.jpeg','.gif','.svg','.zip','.tar','.gz','.7z','.rar']: - self.logger.info(f"Extracted File name: '{file_name}'") - return file_name - else: - return None - else: - return None - except Exception as exception: - self.logger.error(f"Error in extracting file name: {str(exception)}") - raise + # Return the matched file name or path, if any match found + if match: + file_name = match.group() + file_extension = os.path.splitext(file_name)[1].lower() + self.logger.info(f"File extension: '{file_extension}'") + # Check if the file extension is one of the non-binary types + if file_extension in ['.json', '.csv', '.xml', '.xls', '.txt','.md','.html','.png','.jpg','.jpeg','.gif','.svg','.zip','.tar','.gz','.7z','.rar']: + self.logger.info(f"Extracted File name: '{file_name}'") + return file_name + else: + return None + else: + return None + except Exception as exception: + self.logger.error(f"Error in extracting file name: {str(exception)}") + raise - def get_full_file_path(self, file_name): - if not file_name: - return None + def get_full_file_path(self, file_name): + if not file_name: + return None - # Check if the file path is absolute. If not, prepend the current working directory - if not os.path.isabs(file_name): - return os.path.join(os.getcwd(), file_name) - return file_name - - def read_csv_headers(self, file_path): - try: - with open(file_path, newline='') as csvfile: - reader = csv.reader(csvfile) - headers = next(reader) - return headers - except IOError as exception: - self.logger.error(f"IOError: {exception}") - return [] - except StopIteration: - self.logger.error("CSV file is empty.") - return [] + # Check if the file path is absolute. If not, prepend the current working directory + if not os.path.isabs(file_name): + return os.path.join(os.getcwd(), file_name) + return file_name + + def read_csv_headers(self, file_path): + try: + with open(file_path, newline='') as csvfile: + reader = csv.reader(csvfile) + headers = next(reader) + return headers + except IOError as exception: + self.logger.error(f"IOError: {exception}") + return [] + except StopIteration: + self.logger.error("CSV file is empty.") + return [] - def get_code_history(self, language='python'): - try: - self.logger.info("Starting to read last code history.") - output_folder = "output" - file_extension = 'py' if language == 'python' else 'js' - self.logger.info(f"Looking for files with extension: {file_extension}") + def get_code_history(self, language='python'): + try: + self.logger.info("Starting to read last code history.") + output_folder = "output" + file_extension = 'py' if language == 'python' else 'js' + self.logger.info(f"Looking for files with extension: {file_extension}") - # Get a list of all files in the output folder with the correct extension - files = glob.glob(os.path.join(output_folder, f"*.{file_extension}")) - self.logger.info(f"Found {len(files)} files.") + # Get a list of all files in the output folder with the correct extension + files = glob.glob(os.path.join(output_folder, f"*.{file_extension}")) + self.logger.info(f"Found {len(files)} files.") - # Sort the files by date - files.sort(key=lambda x: datetime.strptime(x.split('_', 1)[1].rsplit('.', 1)[0], '%Y_%m_%d-%H_%M_%S'), reverse=True) - self.logger.info("Files sorted by date.") + # Sort the files by date + files.sort(key=lambda x: datetime.strptime(x.split('_', 1)[1].rsplit('.', 1)[0], '%Y_%m_%d-%H_%M_%S'), reverse=True) + self.logger.info("Files sorted by date.") - # Return the latest file - latest_file = files[0] if files else None - self.logger.info(f"Latest file: {latest_file}") + # Return the latest file + latest_file = files[0] if files else None + self.logger.info(f"Latest file: {latest_file}") - # Read the file and return the code - if latest_file: - with open(latest_file, "r") as code_file: - code = code_file.read() - return latest_file, code + # Read the file and return the code + if latest_file: + with open(latest_file, "r") as code_file: + code = code_file.read() + return latest_file, code - except Exception as exception: - self.logger.error(f"Error in reading last code history: {str(exception)}") - raise + except Exception as exception: + self.logger.error(f"Error in reading last code history: {str(exception)}") + raise - def display_help(self): - display_markdown_message("Interpreter\n\ - \n\ - Commands available:\n\ - \n\ - /exit - Exit the interpreter.\n\ - /execute - Execute the last code generated.\n\ - /install - Install a package from npm or pip.\n\ - /save - Save the last code generated.\n\ - /edit - Edit the last code generated.\n\ - /debug - Debug the last code generated.\n\ - /mode - Change the mode of interpreter.\n\ - /model - Change the model for interpreter.\n\ - /language - Change the language of the interpreter.\n\ - /history - Use history as memory.\n\ - /clear - Clear the screen.\n\ - /help - Display this help message.\n\ - /list - List the available models.\n\ - /version - Display the version of the interpreter.\n\ - /log - Switch between verbose and silent mode.\n\ - /prompt - Switch input prompt mode between file and prompt.\n\ - /upgrade - Upgrade the interpreter.\n\ - /shell - Access the shell.\n") - - def display_version(self, version): - display_markdown_message(f"Interpreter - v{version}") + def display_help(self): + display_markdown_message("Interpreter\n\ + \n\ + Commands available:\n\ + \n\ + /exit - Exit the interpreter.\n\ + /execute - Execute the last code generated.\n\ + /install - Install a package from npm or pip.\n\ + /save - Save the last code generated.\n\ + /edit - Edit the last code generated.\n\ + /debug - Debug the last code generated.\n\ + /mode - Change the mode of interpreter.\n\ + /model - Change the model for interpreter.\n\ + /language - Change the language of the interpreter.\n\ + /history - Use history as memory.\n\ + /clear - Clear the screen.\n\ + /help - Display this help message.\n\ + /list - List the available models.\n\ + /version - Display the version of the interpreter.\n\ + /log - Switch between verbose and silent mode.\n\ + /prompt - Switch input prompt mode between file and prompt.\n\ + /upgrade - Upgrade the interpreter.\n\ + /shell - Access the shell.\n") + + def display_version(self, version): + display_markdown_message(f"Interpreter - v{version}") - def clear_screen(self): - os.system('cls' if os.name == 'nt' else 'clear') - - def create_file(self, file_path): - try: - with open(file_path, "w") as file: - file.write("") - except Exception as exception: - self.logger.error(f"Error in creating file: {str(exception)}") - raise - - def read_file(self, file_path): - try: - with open(file_path, "r") as file: - return file.read() - except Exception as exception: - self.logger.error(f"Error in reading file: {str(exception)}") - raise - - def write_file(self, file_path, content): - try: - with open(file_path, "w") as file: - file.write(content) - except Exception as exception: - self.logger.error(f"Error in writing file: {str(exception)}") - raise - - # method to download file from Web and save it - - @staticmethod - def _download_file(url, file_name): - try: - logger = Logger.initialize_logger("logs/interpreter.log") - import requests - logger.info(f"Downloading file: {url}") - response = requests.get(url, allow_redirects=True) - response.raise_for_status() - - with open(file_name, 'wb') as file: - file.write(response.content) - logger.info("Reuquirements.txt file downloaded.") - return True - except Exception as exception: - logger.error(f"Error in downloading file: {str(exception)}") - return False + def clear_screen(self): + os.system('cls' if os.name == 'nt' else 'clear') + + def create_file(self, file_path): + try: + with open(file_path, "w") as file: + file.write("") + except Exception as exception: + self.logger.error(f"Error in creating file: {str(exception)}") + raise + + def read_file(self, file_path): + try: + with open(file_path, "r") as file: + return file.read() + except Exception as exception: + self.logger.error(f"Error in reading file: {str(exception)}") + raise + + def write_file(self, file_path, content): + try: + with open(file_path, "w") as file: + file.write(content) + except Exception as exception: + self.logger.error(f"Error in writing file: {str(exception)}") + raise + + # method to download file from Web and save it + + @staticmethod + def _download_file(url, file_name): + try: + logger = Logger.initialize_logger("logs/interpreter.log") + import requests + logger.info(f"Downloading file: {url}") + response = requests.get(url, allow_redirects=True) + response.raise_for_status() + + with open(file_name, 'wb') as file: + file.write(response.content) + logger.info("Reuquirements.txt file downloaded.") + return True + except Exception as exception: + logger.error(f"Error in downloading file: {str(exception)}") + return False - @staticmethod - def upgrade_interpreter(): - code_interpreter = CodeInterpreter() - logger = Logger.initialize_logger("logs/interpreter.log") - # Download the requirements file - file_url = 'https://raw.githubusercontent.com/haseeb-heaven/code-interpreter/main/requirements.txt' - requirements_file_downloaded = UtilityManager._download_file(file_url, 'requirements.txt') - - # Commands to execute. - command_pip_upgrade = 'pip install open-code-interpreter --upgrade' - command_pip_requirements = 'pip install -r requirements.txt --upgrade' - - # Execute the commands. - command_output, _ = code_interpreter.execute_command(command_pip_upgrade) - display_markdown_message(f"Command Upgrade executed successfully.") - if requirements_file_downloaded: - command_output, _ = code_interpreter.execute_command(command_pip_requirements) - display_markdown_message(f"Command Requirements executed successfully.") - else: - logger.warn(f"Requirements file not downloaded.") - display_markdown_message(f"Warning: Requirements file not downloaded.") - - if command_output: - logger.info(f"Command executed successfully.") - display_code(command_output) - logger.info(f"Output: {command_output[:100]}") \ No newline at end of file + @staticmethod + def upgrade_interpreter(): + code_interpreter = CodeInterpreter() + logger = Logger.initialize_logger("logs/interpreter.log") + # Download the requirements file + file_url = 'https://raw.githubusercontent.com/haseeb-heaven/code-interpreter/main/requirements.txt' + requirements_file_downloaded = UtilityManager._download_file(file_url, 'requirements.txt') + + # Commands to execute. + command_pip_upgrade = 'pip install open-code-interpreter --upgrade' + command_pip_requirements = 'pip install -r requirements.txt --upgrade' + + # Execute the commands. + command_output, _ = code_interpreter.execute_command(command_pip_upgrade) + display_markdown_message(f"Command Upgrade executed successfully.") + if requirements_file_downloaded: + command_output, _ = code_interpreter.execute_command(command_pip_requirements) + display_markdown_message(f"Command Requirements executed successfully.") + else: + logger.warn(f"Requirements file not downloaded.") + display_markdown_message(f"Warning: Requirements file not downloaded.") + + if command_output: + logger.info(f"Command executed successfully.") + display_code(command_output) + logger.info(f"Output: {command_output[:100]}") \ No newline at end of file