diff --git a/web/api/views.py b/web/api/views.py index 8bcf46a84..5aa778692 100644 --- a/web/api/views.py +++ b/web/api/views.py @@ -1067,10 +1067,48 @@ def get(self, request): #save_db = True if 'save_db' in req.query_params else False response = {'status': False} try: - response = get_cms_details(url) + # response = get_cms_details(url) + response = {} + cms_detector_command = f'python3 /usr/src/github/CMSeeK/cmseek.py' + cms_detector_command += ' --random-agent --batch --follow-redirect' + cms_detector_command += f' -u {url}' + + _, output = run_command(cms_detector_command, remove_ansi_sequence=True) + + response['message'] = 'Could not detect CMS!' + + parsed_url = urlparse(url) + + domain_name = parsed_url.hostname + port = parsed_url.port + + find_dir = domain_name + + if port: + find_dir += '_{}'.format(port) + # look for result path in output + path_regex = r"Result: (\/usr\/src[^\"\s]*)" + match = re.search(path_regex, output) + if match: + cms_json_path = match.group(1) + if os.path.isfile(cms_json_path): + cms_file_content = json.loads(open(cms_json_path, 'r').read()) + if not cms_file_content.get('cms_id'): + return response + response = {} + response = cms_file_content + response['status'] = True + try: + # remove results + cms_dir_path = os.path.dirname(cms_json_path) + shutil.rmtree(cms_dir_path) + except Exception as e: + logger.error(e) + return Response(response) + return Response(response) except Exception as e: response = {'status': False, 'message': str(e)} - return Response(response) + return Response(response) class IPToDomain(APIView): diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py index 2c7bdbd8a..c989f269c 100644 --- a/web/reNgine/common_func.py +++ b/web/reNgine/common_func.py @@ -4,7 +4,6 @@ import random import shutil import traceback -import uuid from time import sleep import humanize @@ -27,6 +26,7 @@ from startScan.models import * from targetApp.models import * + logger = get_task_logger(__name__) DISCORD_WEBHOOKS_CACHE = redis.Redis.from_url(CELERY_BROKER_URL) @@ -498,53 +498,13 @@ def get_random_proxy(): # os.environ['HTTPS_PROXY'] = proxy_name return proxy_name - -def get_cms_details(url): - """Get CMS details using cmseek.py. - - Args: - url (str): HTTP URL. - - Returns: - dict: Response. - """ - # this function will fetch cms details using cms_detector - response = {} - cms_detector_command = f'python3 /usr/src/github/CMSeeK/cmseek.py --random-agent --batch --follow-redirect -u {url}' - os.system(cms_detector_command) - - response['status'] = False - response['message'] = 'Could not detect CMS!' - - parsed_url = urlparse(url) - - domain_name = parsed_url.hostname - port = parsed_url.port - - find_dir = domain_name - - if port: - find_dir += '_{}'.format(port) - - # subdomain may also have port number, and is stored in dir as _port - - cms_dir_path = '/usr/src/github/CMSeeK/Result/{}'.format(find_dir) - cms_json_path = cms_dir_path + '/cms.json' - - if os.path.isfile(cms_json_path): - cms_file_content = json.loads(open(cms_json_path, 'r').read()) - if not cms_file_content.get('cms_id'): - return response - response = {} - response = cms_file_content - response['status'] = True - # remove cms dir path - try: - shutil.rmtree(cms_dir_path) - except Exception as e: - print(e) - - return response +def remove_ansi_escape_sequences(text): + # Regular expression to match ANSI escape sequences + ansi_escape_pattern = r'\x1b\[.*?m' + + # Use re.sub() to replace the ANSI escape sequences with an empty string + plain_text = re.sub(ansi_escape_pattern, '', text) + return plain_text #--------------------# diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py index b1e185d28..4323fdf4c 100644 --- a/web/reNgine/tasks.py +++ b/web/reNgine/tasks.py @@ -4091,7 +4091,15 @@ def remove_duplicate_endpoints( logger.warning(msg) @app.task(name='run_command', bind=False, queue='run_command_queue') -def run_command(cmd, cwd=None, shell=False, history_file=None, scan_id=None, activity_id=None): +def run_command( + cmd, + cwd=None, + shell=False, + history_file=None, + scan_id=None, + activity_id=None, + remove_ansi_sequence=False + ): """Run a given command using subprocess module. Args: @@ -4100,7 +4108,7 @@ def run_command(cmd, cwd=None, shell=False, history_file=None, scan_id=None, act echo (bool): Log command. shell (bool): Run within separate shell if True. history_file (str): Write command + output to history file. - + remove_ansi_sequence (bool): Used to remove ANSI escape sequences from output such as color coding Returns: tuple: Tuple with return_code, output. """ @@ -4139,6 +4147,8 @@ def run_command(cmd, cwd=None, shell=False, history_file=None, scan_id=None, act mode = 'w' with open(history_file, mode) as f: f.write(f'\n{cmd}\n{return_code}\n{output}\n------------------\n') + if remove_ansi_sequence: + output = remove_ansi_escape_sequences(output) return return_code, output